Skip to content

Commit

Permalink
Add NumInterest to optimise searching for number of sublist entries
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Sep 23, 2024
1 parent cf6f2c7 commit 8460ad2
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 22 deletions.
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func (a *Account) Interest(subject string) int {
var nms int
a.mu.RLock()
if a.sl != nil {
res := a.sl.Match(subject)
nms = len(res.psubs) + len(res.qsubs)
np, nq := a.sl.NumInterest(subject)
nms = np + nq
}
a.mu.RUnlock()
return nms
Expand Down
10 changes: 5 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3234,7 +3234,7 @@ func (c *client) processUnsub(arg []byte) error {
func (c *client) checkDenySub(subject string) bool {
if denied, ok := c.mperms.dcache[subject]; ok {
return denied
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
} else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 {
c.mperms.dcache[subject] = true
return true
} else {
Expand Down Expand Up @@ -3819,13 +3819,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed := true
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := c.perms.pub.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && c.perms.pub.deny != nil {
r := c.perms.pub.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := c.perms.pub.deny.NumInterest(subject)
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
Expand Down
12 changes: 6 additions & 6 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,10 +2544,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
if !s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
if s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account shouldn't have interest")
}
}
Expand Down Expand Up @@ -2587,10 +2587,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
if !s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
if s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account shouldn't have interest")
}
}
Expand Down Expand Up @@ -2625,10 +2625,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if s.sys.account.sl.hasInterest(rg.asubj, true) {
if s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account shouldn't have interest")
}
if !s.gacc.sl.hasInterest(rg.asubj, true) {
if !s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account should have interest")
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4509,13 +4509,13 @@ func generatePubPerms(perms *Permissions) *perm {
func pubAllowed(perms *perm, subject string) bool {
allowed := true
if perms.allow != nil {
r := perms.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := perms.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && perms.deny != nil {
r := perms.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := perms.deny.NumInterest(subject)
allowed = np == 0
}
return allowed
}
Expand Down
33 changes: 28 additions & 5 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,14 @@ func (s *Sublist) MatchBytes(subject []byte) *SublistResult {
// HasInterest will return whether or not there is any interest in the subject.
// In cases where more detail is not required, this may be faster than Match.
func (s *Sublist) HasInterest(subject string) bool {
return s.hasInterest(subject, true)
return s.hasInterest(subject, true, nil, nil)
}

// NumInterest will return the number of subs/qsubs interested in the subject.
// In cases where more detail is not required, this may be faster than Match.
func (s *Sublist) NumInterest(subject string) (np, nq int) {
s.hasInterest(subject, true, &np, &nq)
return
}

func (s *Sublist) matchNoLock(subject string) *SublistResult {
Expand Down Expand Up @@ -623,14 +630,18 @@ func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *Sublis
return result
}

func (s *Sublist) hasInterest(subject string, doLock bool) bool {
func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool {
// Check cache first.
if doLock {
s.RLock()
}
var matched bool
if s.cache != nil {
if r, ok := s.cache[subject]; ok {
if np != nil && nq != nil {
*np += len(r.psubs)
*nq += len(r.qsubs)
}
matched = len(r.psubs)+len(r.qsubs) > 0
}
}
Expand Down Expand Up @@ -663,7 +674,7 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool {
s.RLock()
defer s.RUnlock()
}
return matchLevelForAny(s.root, tokens)
return matchLevelForAny(s.root, tokens, np, nq)
}

// Remove entries in the cache until we are under the maximum.
Expand Down Expand Up @@ -778,17 +789,21 @@ func matchLevel(l *level, toks []string, results *SublistResult) {
}
}

func matchLevelForAny(l *level, toks []string) bool {
func matchLevelForAny(l *level, toks []string, np, nq *int) bool {
var pwc, n *node
for i, t := range toks {
if l == nil {
return false
}
if l.fwc != nil {
if np != nil && nq != nil {
*np += len(l.fwc.psubs)
*nq += len(l.fwc.qsubs)
}
return true
}
if pwc = l.pwc; pwc != nil {
if match := matchLevelForAny(pwc.next, toks[i+1:]); match {
if match := matchLevelForAny(pwc.next, toks[i+1:], np, nq); match {
return true
}
}
Expand All @@ -800,9 +815,17 @@ func matchLevelForAny(l *level, toks []string) bool {
}
}
if n != nil {
if np != nil && nq != nil {
*np += len(n.psubs)
*nq += len(n.qsubs)
}
return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0
}
if pwc != nil {
if np != nil && nq != nil {
*np += len(pwc.psubs)
*nq += len(pwc.qsubs)
}
return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0
}
return false
Expand Down
172 changes: 172 additions & 0 deletions server/sublist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,178 @@ func TestSublistHasInterest(t *testing.T) {
sl.Remove(qsub)
}

func TestSublistNumInterest(t *testing.T) {
sl := NewSublistWithCache()
fooSub := newSub("foo")
sl.Insert(fooSub)

require_NumInterest := func(t *testing.T, subj string, wnp, wnq int) {
t.Helper()
np, nq := sl.NumInterest(subj)
require_Equal(t, np, wnp)
require_Equal(t, nq, wnq)
}

// Expect to find that "foo" matches but "bar" doesn't.
// At this point nothing should be in the cache.
require_NumInterest(t, "foo", 1, 0)
require_NumInterest(t, "bar", 0, 0)
require_Equal(t, sl.cacheHits, 0)

// Now call Match(), which will populate the cache.
sl.Match("foo")
require_Equal(t, sl.cacheHits, 0)

// Future calls to HasInterest() should hit the cache now.
for i := uint64(1); i <= 5; i++ {
require_NumInterest(t, "foo", 1, 0)
require_Equal(t, sl.cacheHits, i)
}

// Call Match on a subject we know there is no match.
sl.Match("bar")
require_NumInterest(t, "bar", 0, 0)

// Remove fooSub and check interest again
sl.Remove(fooSub)
require_NumInterest(t, "foo", 0, 0)

// Try with some wildcards
sub := newSub("foo.*")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Remove sub, there should be no interest
sl.Remove(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

sub = newSub("foo.>")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 1, 0)

sl.Remove(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

sub = newSub("*.>")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 1, 0)
sl.Remove(sub)

sub = newSub("*.bar")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)
sl.Remove(sub)

sub = newSub("*")
sl.Insert(sub)
require_NumInterest(t, "foo", 1, 0)
require_NumInterest(t, "foo.bar", 0, 0)
sl.Remove(sub)

// Try with queues now.
qsub := newQSub("foo", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)

qsub2 := newQSub("foo", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 2)
require_NumInterest(t, "foo.bar", 0, 0)

// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)

// Remove last.
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)

// With wildcards now
qsub = newQSub("foo.*", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Add another queue to the group
qsub2 = newQSub("foo.*", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 2)
require_NumInterest(t, "foo.bar.baz", 0, 0)
// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Remove last
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

qsub = newQSub("foo.>", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)

// Add another queue to the group
qsub2 = newQSub("foo.>", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 2)
require_NumInterest(t, "foo.bar.baz", 0, 2)

// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)

// Remove last
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

qsub = newQSub("*.>", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)
sl.Remove(qsub)

qsub = newQSub("*.bar", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)
sl.Remove(qsub)

qsub = newQSub("*", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)
sl.Remove(qsub)
}

func subsInit(pre string, toks []string) {
var sub string
for _, t := range toks {
Expand Down

0 comments on commit 8460ad2

Please sign in to comment.