Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NumInterest to optimise searching for number of sublist entries #5918

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func (a *Account) Interest(subject string) int {
var nms int
a.mu.RLock()
if a.sl != nil {
res := a.sl.Match(subject)
nms = len(res.psubs) + len(res.qsubs)
np, nq := a.sl.NumInterest(subject)
nms = np + nq
}
a.mu.RUnlock()
return nms
Expand Down
10 changes: 5 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3234,7 +3234,7 @@ func (c *client) processUnsub(arg []byte) error {
func (c *client) checkDenySub(subject string) bool {
if denied, ok := c.mperms.dcache[subject]; ok {
return denied
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
} else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 {
c.mperms.dcache[subject] = true
return true
} else {
Expand Down Expand Up @@ -3819,13 +3819,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed := true
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := c.perms.pub.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && c.perms.pub.deny != nil {
r := c.perms.pub.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := c.perms.pub.deny.NumInterest(subject)
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
Expand Down
12 changes: 6 additions & 6 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,10 +2544,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
if !s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
if s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account shouldn't have interest")
}
}
Expand Down Expand Up @@ -2587,10 +2587,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if !s.sys.account.sl.hasInterest(rg.asubj, true) {
if !s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account should have interest")
}
if s.gacc.sl.hasInterest(rg.asubj, true) {
if s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account shouldn't have interest")
}
}
Expand Down Expand Up @@ -2625,10 +2625,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) {
// Check account interest for the AppendEntry subject.
checkFor(t, time.Second, time.Millisecond*25, func() error {
for _, s := range c.servers {
if s.sys.account.sl.hasInterest(rg.asubj, true) {
if s.sys.account.sl.HasInterest(rg.asubj) {
return fmt.Errorf("system account shouldn't have interest")
}
if !s.gacc.sl.hasInterest(rg.asubj, true) {
if !s.gacc.sl.HasInterest(rg.asubj) {
return fmt.Errorf("global account should have interest")
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4509,13 +4509,13 @@ func generatePubPerms(perms *Permissions) *perm {
func pubAllowed(perms *perm, subject string) bool {
allowed := true
if perms.allow != nil {
r := perms.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := perms.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && perms.deny != nil {
r := perms.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := perms.deny.NumInterest(subject)
allowed = np == 0
}
return allowed
}
Expand Down
33 changes: 28 additions & 5 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,14 @@ func (s *Sublist) MatchBytes(subject []byte) *SublistResult {
// HasInterest will return whether or not there is any interest in the subject.
// In cases where more detail is not required, this may be faster than Match.
func (s *Sublist) HasInterest(subject string) bool {
return s.hasInterest(subject, true)
return s.hasInterest(subject, true, nil, nil)
}

// NumInterest will return the number of subs/qsubs interested in the subject.
// In cases where more detail is not required, this may be faster than Match.
func (s *Sublist) NumInterest(subject string) (np, nq int) {
s.hasInterest(subject, true, &np, &nq)
return
}

func (s *Sublist) matchNoLock(subject string) *SublistResult {
Expand Down Expand Up @@ -623,14 +630,18 @@ func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *Sublis
return result
}

func (s *Sublist) hasInterest(subject string, doLock bool) bool {
func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool {
// Check cache first.
if doLock {
s.RLock()
}
var matched bool
if s.cache != nil {
if r, ok := s.cache[subject]; ok {
if np != nil && nq != nil {
*np += len(r.psubs)
*nq += len(r.qsubs)
}
matched = len(r.psubs)+len(r.qsubs) > 0
}
}
Expand Down Expand Up @@ -663,7 +674,7 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool {
s.RLock()
defer s.RUnlock()
}
return matchLevelForAny(s.root, tokens)
return matchLevelForAny(s.root, tokens, np, nq)
}

// Remove entries in the cache until we are under the maximum.
Expand Down Expand Up @@ -778,17 +789,21 @@ func matchLevel(l *level, toks []string, results *SublistResult) {
}
}

func matchLevelForAny(l *level, toks []string) bool {
func matchLevelForAny(l *level, toks []string, np, nq *int) bool {
var pwc, n *node
for i, t := range toks {
if l == nil {
return false
}
if l.fwc != nil {
if np != nil && nq != nil {
*np += len(l.fwc.psubs)
*nq += len(l.fwc.qsubs)
}
return true
}
if pwc = l.pwc; pwc != nil {
if match := matchLevelForAny(pwc.next, toks[i+1:]); match {
if match := matchLevelForAny(pwc.next, toks[i+1:], np, nq); match {
return true
}
}
Expand All @@ -800,9 +815,17 @@ func matchLevelForAny(l *level, toks []string) bool {
}
}
if n != nil {
if np != nil && nq != nil {
*np += len(n.psubs)
*nq += len(n.qsubs)
}
return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0
}
if pwc != nil {
if np != nil && nq != nil {
*np += len(pwc.psubs)
*nq += len(pwc.qsubs)
}
return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0
}
return false
Expand Down
172 changes: 172 additions & 0 deletions server/sublist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,178 @@ func TestSublistHasInterest(t *testing.T) {
sl.Remove(qsub)
}

func TestSublistNumInterest(t *testing.T) {
sl := NewSublistWithCache()
fooSub := newSub("foo")
sl.Insert(fooSub)

require_NumInterest := func(t *testing.T, subj string, wnp, wnq int) {
t.Helper()
np, nq := sl.NumInterest(subj)
require_Equal(t, np, wnp)
require_Equal(t, nq, wnq)
}

// Expect to find that "foo" matches but "bar" doesn't.
// At this point nothing should be in the cache.
require_NumInterest(t, "foo", 1, 0)
require_NumInterest(t, "bar", 0, 0)
require_Equal(t, sl.cacheHits, 0)

// Now call Match(), which will populate the cache.
sl.Match("foo")
require_Equal(t, sl.cacheHits, 0)

// Future calls to HasInterest() should hit the cache now.
for i := uint64(1); i <= 5; i++ {
require_NumInterest(t, "foo", 1, 0)
require_Equal(t, sl.cacheHits, i)
}

// Call Match on a subject we know there is no match.
sl.Match("bar")
require_NumInterest(t, "bar", 0, 0)

// Remove fooSub and check interest again
sl.Remove(fooSub)
require_NumInterest(t, "foo", 0, 0)

// Try with some wildcards
sub := newSub("foo.*")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Remove sub, there should be no interest
sl.Remove(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

sub = newSub("foo.>")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 1, 0)

sl.Remove(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

sub = newSub("*.>")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 1, 0)
sl.Remove(sub)

sub = newSub("*.bar")
sl.Insert(sub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 1, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)
sl.Remove(sub)

sub = newSub("*")
sl.Insert(sub)
require_NumInterest(t, "foo", 1, 0)
require_NumInterest(t, "foo.bar", 0, 0)
sl.Remove(sub)

// Try with queues now.
qsub := newQSub("foo", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)

qsub2 := newQSub("foo", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 2)
require_NumInterest(t, "foo.bar", 0, 0)

// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)

// Remove last.
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)

// With wildcards now
qsub = newQSub("foo.*", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Add another queue to the group
qsub2 = newQSub("foo.*", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 2)
require_NumInterest(t, "foo.bar.baz", 0, 0)
// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)

// Remove last
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

qsub = newQSub("foo.>", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)

// Add another queue to the group
qsub2 = newQSub("foo.>", "baz")
sl.Insert(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 2)
require_NumInterest(t, "foo.bar.baz", 0, 2)

// Remove first queue
sl.Remove(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)

// Remove last
sl.Remove(qsub2)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 0)
require_NumInterest(t, "foo.bar.baz", 0, 0)

qsub = newQSub("*.>", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 1)
sl.Remove(qsub)

qsub = newQSub("*.bar", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 0)
require_NumInterest(t, "foo.bar", 0, 1)
require_NumInterest(t, "foo.bar.baz", 0, 0)
sl.Remove(qsub)

qsub = newQSub("*", "bar")
sl.Insert(qsub)
require_NumInterest(t, "foo", 0, 1)
require_NumInterest(t, "foo.bar", 0, 0)
sl.Remove(qsub)
}

func subsInit(pre string, toks []string) {
var sub string
for _, t := range toks {
Expand Down