Skip to content

Commit

Permalink
Cherry-picks for 2.10.18-RC.1 (#5634)
Browse files Browse the repository at this point in the history
Includes:

- #5610
- #5618
- #5616
- #5627
- #5629
- #5630
- #5632
- #5635
  • Loading branch information
wallyqs authored Jul 9, 2024
2 parents b91de03 + 8186bd0 commit d84c468
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.22.4"
- "1.21.11"
- "1.22.5"
- "1.21.12"

go_import_path: github.com/nats-io/nats-server

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ go 1.21

require (
github.com/klauspost/compress v1.17.9
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.7
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.24.0
golang.org/x/sys v0.21.0
golang.org/x/crypto v0.25.0
golang.org/x/sys v0.22.0
golang.org/x/time v0.5.0
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand All @@ -20,11 +20,11 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
3 changes: 2 additions & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4154,7 +4154,8 @@ func (o *consumer) checkNumPending() uint64 {
if o.mset != nil {
var state StreamState
o.mset.store.FastState(&state)
if o.sseq > state.LastSeq && o.npc != 0 || o.npc > int64(state.Msgs) {
npc := o.numPending()
if o.sseq > state.LastSeq && npc > 0 || npc > state.Msgs {
// Re-calculate.
o.streamNumPending()
}
Expand Down
77 changes: 63 additions & 14 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2318,11 +2318,19 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor

if filter == _EMPTY_ {
filter = fwcs
wc = true
}

// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && mb.fss.Size() == 1 {
_, isAll = mb.fss.Find(stringToBytes(filter))
if !wc {
_, isAll = mb.fss.Find(stringToBytes(filter))
} else {
// Since mb.fss.Find won't work if filter is a wildcard, need to use Match instead.
mb.fss.Match(stringToBytes(filter), func(subject []byte, _ *SimpleState) {
isAll = true
})
}
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
Expand Down Expand Up @@ -2461,6 +2469,7 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (

if filter == _EMPTY_ {
filter = fwcs
wc = true
}

update := func(ss *SimpleState) {
Expand Down Expand Up @@ -2494,6 +2503,10 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (

var havePartial bool
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
if havePartial {
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
Expand Down Expand Up @@ -2616,16 +2629,43 @@ func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) {
}
// Here we need to translate this to index into fs.blks properly.
mb := fs.bim[start]
if mb == nil {
// psim fblk can be lazy.
i := start + 1
for ; i <= stop; i++ {
mb = fs.bim[i]
if mb == nil {
continue
}
if _, f, _ := mb.filteredPending(filter, wc, 0); f > 0 {
break
}
}
// Update fblk since fblk was outdated.
if !wc {
if psi, ok := fs.psim.Find(stringToBytes(filter)); ok {
psi.fblk = i
}
} else {
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
if i > psi.fblk {
psi.fblk = i
}
})
}
}
// Still nothing.
if mb == nil {
return -1, -1
}
// Grab first index.
fi, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

mb = fs.bim[stop]
if mb == nil {
return -1, -1
// Grab last if applicable.
var li int
if mb = fs.bim[stop]; mb != nil {
li, _ = fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))
}
li, _ := fs.selectMsgBlockWithIndex(atomic.LoadUint64(&mb.last.seq))

return fi, li
}
Expand Down Expand Up @@ -2752,7 +2792,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
if !ok {
return nil
}
start, stop = fs.bim[info.fblk], fs.bim[info.lblk]
if f := fs.bim[info.fblk]; f != nil {
start = f
}
if l := fs.bim[info.lblk]; l != nil {
stop = l
}
}

// Aggregate fss.
Expand Down Expand Up @@ -2828,16 +2873,14 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
}

isAll := filter == _EMPTY_ || filter == fwcs
if isAll && filter == _EMPTY_ {
filter = fwcs
}
wc := subjectHasWildcard(filter)

// See if filter was provided but its the only subject.
if !isAll && !wc && fs.psim.Size() == 1 {
if _, ok := fs.psim.Find(stringToBytes(filter)); ok {
isAll = true
}
}
if isAll && filter == _EMPTY_ {
filter = fwcs
_, isAll = fs.psim.Find(stringToBytes(filter))
}
// If we are isAll and have no deleted we can do a simpler calculation.
if !lastPerSubject && isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs {
Expand Down Expand Up @@ -2960,6 +3003,10 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)

var havePartial bool
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
if havePartial {
// If we already found a partial then don't do anything else.
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
Expand Down Expand Up @@ -3711,11 +3758,12 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {

// collect all that are not correct.
needAttention := make(map[string]*psi)
fs.psim.Match([]byte(fwcs), func(subj []byte, psi *psi) {
fs.psim.Iter(func(subj []byte, psi *psi) bool {
numMsgs += psi.total
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
}
return true
})

// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
Expand All @@ -3735,10 +3783,11 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
fs.rebuildStateLocked(nil)
// Need to redo blocks that need attention.
needAttention = make(map[string]*psi)
fs.psim.Match([]byte(fwcs), func(subj []byte, psi *psi) {
fs.psim.Iter(func(subj []byte, psi *psi) bool {
if psi.total > maxMsgsPer {
needAttention[string(subj)] = psi
}
return true
})
}

Expand Down
34 changes: 34 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7214,6 +7214,40 @@ func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) {
require_Equal(t, loaded, 1)
}

func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 128},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")

fs.StoreMsg("foo.BB.bar", nil, msg)
fs.StoreMsg("foo.BB.bar", nil, msg)
fs.StoreMsg("foo.AA.bar", nil, msg)
for i := 0; i < 5; i++ {
fs.StoreMsg("foo.BB.bar", nil, msg)
}
fs.StoreMsg("foo.AA.bar", nil, msg)
fs.StoreMsg("foo.AA.bar", nil, msg)

// Should have created 4 blocks.
// BB BB | AA BB | BB BB | BB BB | AA AA
require_Equal(t, fs.numMsgBlocks(), 5)

fs.RemoveMsg(3)
fs.RemoveMsg(4)

// Second block should be gone now.
// BB BB | -- -- | BB BB | BB BB | AA AA
require_Equal(t, fs.numMsgBlocks(), 4)

_, _, err = fs.LoadNextMsg("foo.AA.bar", false, 4, nil)
require_NoError(t, err)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 6 additions & 3 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var havePartial bool
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
subjs := bytesToString(subj)
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, fss.First, fss)
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -1219,7 +1218,11 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
// Will recalculate the first sequence for this subject in this block.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
Expand Down
1 change: 1 addition & 0 deletions server/stree/stree.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje
t.match(cn, nparts, pre, cb)
}
}
return
}
// Here we have normal traversal, so find the next child.
nn := n.findChild(p)
Expand Down
28 changes: 27 additions & 1 deletion server/stree/stree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ func TestSubjectTreeBasics(t *testing.T) {
require_True(t, old == nil)
require_False(t, updated)
require_Equal(t, st.Size(), 1)
// Find with single leaf.
// Find shouldn't work with a wildcard.
_, found := st.Find(b("foo.bar.*"))
require_False(t, found)
// But it should with a literal. Find with single leaf.
v, found := st.Find(b("foo.bar.baz"))
require_True(t, found)
require_Equal(t, *v, 22)
Expand Down Expand Up @@ -738,3 +741,26 @@ func TestSubjectTreeNode48(t *testing.T) {
require_True(t, gotB)
require_True(t, gotC)
}

func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) {
st := NewSubjectTree[int]()
st.Insert(b("foo.bar.A"), 1)
st.Insert(b("foo.bar.B"), 1)
st.Insert(b("foo.bar.C"), 1)
st.Insert(b("foo.bar.>"), 1)

for _, f := range [][]byte{
[]byte(">"),
[]byte("foo.>"),
[]byte("foo.bar.>"),
} {
seen := map[string]struct{}{}
st.Match(f, func(bsubj []byte, _ *int) {
subj := string(bsubj)
if _, ok := seen[subj]; ok {
t.Logf("Match callback was called twice for %q", subj)
}
seen[subj] = struct{}{}
})
}
}

0 comments on commit d84c468

Please sign in to comment.