From 8460ad25c93c58bc42f937d71cb4af5c7bbe5506 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 23 Sep 2024 16:33:51 +0100 Subject: [PATCH] Add `NumInterest` to optimise searching for number of sublist entries Signed-off-by: Neil Twigg --- server/accounts.go | 4 +- server/client.go | 10 +- server/jetstream_cluster_4_test.go | 12 +- server/mqtt.go | 8 +- server/sublist.go | 33 +++++- server/sublist_test.go | 172 +++++++++++++++++++++++++++++ 6 files changed, 217 insertions(+), 22 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 1e47f8c8c9e..e4569be371d 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -883,8 +883,8 @@ func (a *Account) Interest(subject string) int { var nms int a.mu.RLock() if a.sl != nil { - res := a.sl.Match(subject) - nms = len(res.psubs) + len(res.qsubs) + np, nq := a.sl.NumInterest(subject) + nms = np + nq } a.mu.RUnlock() return nms diff --git a/server/client.go b/server/client.go index e039f31349a..482670dc788 100644 --- a/server/client.go +++ b/server/client.go @@ -3234,7 +3234,7 @@ func (c *client) processUnsub(arg []byte) error { func (c *client) checkDenySub(subject string) bool { if denied, ok := c.mperms.dcache[subject]; ok { return denied - } else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 { + } else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 { c.mperms.dcache[subject] = true return true } else { @@ -3819,13 +3819,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo allowed := true // Cache miss, check allow then deny as needed. if c.perms.pub.allow != nil { - r := c.perms.pub.allow.Match(subject) - allowed = len(r.psubs) != 0 + np, _ := c.perms.pub.allow.NumInterest(subject) + allowed = np != 0 } // If we have a deny list and are currently allowed, check that as well. if allowed && c.perms.pub.deny != nil { - r := c.perms.pub.deny.Match(subject) - allowed = len(r.psubs) == 0 + np, _ := c.perms.pub.deny.NumInterest(subject) + allowed = np == 0 } // If we are currently not allowed but we are tracking reply subjects diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 79b62e13c24..c62fe4e461c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2544,10 +2544,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { // Check account interest for the AppendEntry subject. checkFor(t, time.Second, time.Millisecond*25, func() error { for _, s := range c.servers { - if !s.sys.account.sl.hasInterest(rg.asubj, true) { + if !s.sys.account.sl.HasInterest(rg.asubj) { return fmt.Errorf("system account should have interest") } - if s.gacc.sl.hasInterest(rg.asubj, true) { + if s.gacc.sl.HasInterest(rg.asubj) { return fmt.Errorf("global account shouldn't have interest") } } @@ -2587,10 +2587,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { // Check account interest for the AppendEntry subject. checkFor(t, time.Second, time.Millisecond*25, func() error { for _, s := range c.servers { - if !s.sys.account.sl.hasInterest(rg.asubj, true) { + if !s.sys.account.sl.HasInterest(rg.asubj) { return fmt.Errorf("system account should have interest") } - if s.gacc.sl.hasInterest(rg.asubj, true) { + if s.gacc.sl.HasInterest(rg.asubj) { return fmt.Errorf("global account shouldn't have interest") } } @@ -2625,10 +2625,10 @@ func TestJetStreamClusterAccountNRG(t *testing.T) { // Check account interest for the AppendEntry subject. checkFor(t, time.Second, time.Millisecond*25, func() error { for _, s := range c.servers { - if s.sys.account.sl.hasInterest(rg.asubj, true) { + if s.sys.account.sl.HasInterest(rg.asubj) { return fmt.Errorf("system account shouldn't have interest") } - if !s.gacc.sl.hasInterest(rg.asubj, true) { + if !s.gacc.sl.HasInterest(rg.asubj) { return fmt.Errorf("global account should have interest") } } diff --git a/server/mqtt.go b/server/mqtt.go index 120dda2783b..1c2aa1405bc 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -4509,13 +4509,13 @@ func generatePubPerms(perms *Permissions) *perm { func pubAllowed(perms *perm, subject string) bool { allowed := true if perms.allow != nil { - r := perms.allow.Match(subject) - allowed = len(r.psubs) != 0 + np, _ := perms.allow.NumInterest(subject) + allowed = np != 0 } // If we have a deny list and are currently allowed, check that as well. if allowed && perms.deny != nil { - r := perms.deny.Match(subject) - allowed = len(r.psubs) == 0 + np, _ := perms.deny.NumInterest(subject) + allowed = np == 0 } return allowed } diff --git a/server/sublist.go b/server/sublist.go index 1ec878a19e0..f4471f337c0 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -539,7 +539,14 @@ func (s *Sublist) MatchBytes(subject []byte) *SublistResult { // HasInterest will return whether or not there is any interest in the subject. // In cases where more detail is not required, this may be faster than Match. func (s *Sublist) HasInterest(subject string) bool { - return s.hasInterest(subject, true) + return s.hasInterest(subject, true, nil, nil) +} + +// NumInterest will return the number of subs/qsubs interested in the subject. +// In cases where more detail is not required, this may be faster than Match. +func (s *Sublist) NumInterest(subject string) (np, nq int) { + s.hasInterest(subject, true, &np, &nq) + return } func (s *Sublist) matchNoLock(subject string) *SublistResult { @@ -623,7 +630,7 @@ func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *Sublis return result } -func (s *Sublist) hasInterest(subject string, doLock bool) bool { +func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool { // Check cache first. if doLock { s.RLock() @@ -631,6 +638,10 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool { var matched bool if s.cache != nil { if r, ok := s.cache[subject]; ok { + if np != nil && nq != nil { + *np += len(r.psubs) + *nq += len(r.qsubs) + } matched = len(r.psubs)+len(r.qsubs) > 0 } } @@ -663,7 +674,7 @@ func (s *Sublist) hasInterest(subject string, doLock bool) bool { s.RLock() defer s.RUnlock() } - return matchLevelForAny(s.root, tokens) + return matchLevelForAny(s.root, tokens, np, nq) } // Remove entries in the cache until we are under the maximum. @@ -778,17 +789,21 @@ func matchLevel(l *level, toks []string, results *SublistResult) { } } -func matchLevelForAny(l *level, toks []string) bool { +func matchLevelForAny(l *level, toks []string, np, nq *int) bool { var pwc, n *node for i, t := range toks { if l == nil { return false } if l.fwc != nil { + if np != nil && nq != nil { + *np += len(l.fwc.psubs) + *nq += len(l.fwc.qsubs) + } return true } if pwc = l.pwc; pwc != nil { - if match := matchLevelForAny(pwc.next, toks[i+1:]); match { + if match := matchLevelForAny(pwc.next, toks[i+1:], np, nq); match { return true } } @@ -800,9 +815,17 @@ func matchLevelForAny(l *level, toks []string) bool { } } if n != nil { + if np != nil && nq != nil { + *np += len(n.psubs) + *nq += len(n.qsubs) + } return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 } if pwc != nil { + if np != nil && nq != nil { + *np += len(pwc.psubs) + *nq += len(pwc.qsubs) + } return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0 } return false diff --git a/server/sublist_test.go b/server/sublist_test.go index c74a10bc850..61ea75575d6 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1768,6 +1768,178 @@ func TestSublistHasInterest(t *testing.T) { sl.Remove(qsub) } +func TestSublistNumInterest(t *testing.T) { + sl := NewSublistWithCache() + fooSub := newSub("foo") + sl.Insert(fooSub) + + require_NumInterest := func(t *testing.T, subj string, wnp, wnq int) { + t.Helper() + np, nq := sl.NumInterest(subj) + require_Equal(t, np, wnp) + require_Equal(t, nq, wnq) + } + + // Expect to find that "foo" matches but "bar" doesn't. + // At this point nothing should be in the cache. + require_NumInterest(t, "foo", 1, 0) + require_NumInterest(t, "bar", 0, 0) + require_Equal(t, sl.cacheHits, 0) + + // Now call Match(), which will populate the cache. + sl.Match("foo") + require_Equal(t, sl.cacheHits, 0) + + // Future calls to HasInterest() should hit the cache now. + for i := uint64(1); i <= 5; i++ { + require_NumInterest(t, "foo", 1, 0) + require_Equal(t, sl.cacheHits, i) + } + + // Call Match on a subject we know there is no match. + sl.Match("bar") + require_NumInterest(t, "bar", 0, 0) + + // Remove fooSub and check interest again + sl.Remove(fooSub) + require_NumInterest(t, "foo", 0, 0) + + // Try with some wildcards + sub := newSub("foo.*") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove sub, there should be no interest + sl.Remove(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + sub = newSub("foo.>") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 1, 0) + + sl.Remove(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + sub = newSub("*.>") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 1, 0) + sl.Remove(sub) + + sub = newSub("*.bar") + sl.Insert(sub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 1, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + sl.Remove(sub) + + sub = newSub("*") + sl.Insert(sub) + require_NumInterest(t, "foo", 1, 0) + require_NumInterest(t, "foo.bar", 0, 0) + sl.Remove(sub) + + // Try with queues now. + qsub := newQSub("foo", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + + qsub2 := newQSub("foo", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 2) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove last. + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + + // With wildcards now + qsub = newQSub("foo.*", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Add another queue to the group + qsub2 = newQSub("foo.*", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 0) + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove last + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub = newQSub("foo.>", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + + // Add another queue to the group + qsub2 = newQSub("foo.>", "baz") + sl.Insert(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 2) + + // Remove first queue + sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + + // Remove last + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 0) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub = newQSub("*.>", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 1) + sl.Remove(qsub) + + qsub = newQSub("*.bar", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 1) + require_NumInterest(t, "foo.bar.baz", 0, 0) + sl.Remove(qsub) + + qsub = newQSub("*", "bar") + sl.Insert(qsub) + require_NumInterest(t, "foo", 0, 1) + require_NumInterest(t, "foo.bar", 0, 0) + sl.Remove(qsub) +} + func subsInit(pre string, toks []string) { var sub string for _, t := range toks {