diff --git a/server/auth_callout_test.go b/server/auth_callout_test.go index 26fe1e857e1..a578f9a3185 100644 --- a/server/auth_callout_test.go +++ b/server/auth_callout_test.go @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) { // This one will use callout since not defined in server config. nc := at.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) { // This one will use callout since not defined in server config. nc := at.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -373,13 +375,13 @@ func TestAuthCalloutAllowedAccounts(t *testing.T) { t.Helper() var nc *nats.Conn - defer nc.Close() // Assume no auth user. if password == "" { nc = at.Connect() } else { nc = at.Connect(nats.UserInfo(user, password)) } + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -470,6 +472,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) { nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"), nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"), ) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -521,6 +524,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) { require_NoError(t, err) nc := ac.Connect(nkeyOpt) + defer nc.Close() // Make sure that the callout was called. if atomic.LoadUint32(&callouts) != 1 { @@ -740,6 +744,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // Send correct token. This should switch us to the test account. nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken)) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -760,6 +765,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // Send the signing key token. This should switch us to the test account, but the user // is signed with the account signing key nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken)) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -779,6 +785,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) { // is signed with the account signing key nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken)) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1004,10 +1011,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect(nats.UserInfo("dlc", "zzz")) + nc := ac.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() // Authorization services can optionally encrypt the responses using the server's public xkey. - ac.Connect(nats.UserInfo("dlc", "xxx")) + nc = ac.Connect(nats.UserInfo("dlc", "xxx")) + defer nc.Close() } func TestAuthCalloutOperatorModeEncryption(t *testing.T) { @@ -1099,10 +1108,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) { defer removeFile(t, creds) // This will receive an encrypted request to the auth service but send plaintext response. - ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA)) + nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA)) + defer nc.Close() // This will receive an encrypted request to the auth service and send an encrypted response. - ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB)) + nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB)) + defer nc.Close() } func TestAuthCalloutServerTags(t *testing.T) { @@ -1130,7 +1141,8 @@ func TestAuthCalloutServerTags(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect() + nc := ac.Connect() + defer nc.Close() tags := <-tch require_True(t, len(tags) == 2) @@ -1163,7 +1175,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) { ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd")) defer ac.Cleanup() - ac.Connect() + nc := ac.Connect() + defer nc.Close() cluster := <-ch require_True(t, cluster == "HUB") @@ -1266,7 +1279,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) { require_NoError(t, err) // This one will use callout since not defined in server config. - ac.Connect(nats.UserInfo("dlc", "zzz")) + nc := ac.Connect(nats.UserInfo("dlc", "zzz")) + defer nc.Close() checkSubsPending(t, sub, 0) checkAuthErrEvent := func(user, pass, reason string) { @@ -1326,6 +1340,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) { // Setup system user. snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() // Allow this connect event to pass us by.. time.Sleep(250 * time.Millisecond) @@ -1697,6 +1712,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) { // Send correct token. This should switch us to the A account. nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA")) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1708,6 +1724,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) { nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB")) require_NoError(t, err) + defer nc.Close() resp, err = nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1785,6 +1802,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) { nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"), nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"), ) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) @@ -1992,6 +2010,7 @@ func TestOperatorModeUserRevocation(t *testing.T) { // connect the system user sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds)) require_NoError(t, err) + defer sysNC.Close() // Bearer token etc.. // This is used by all users, and the customization will be in other connect args. diff --git a/server/auth_test.go b/server/auth_test.go index 15720144f18..05c3402f7d6 100644 --- a/server/auth_test.go +++ b/server/auth_test.go @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) { // Make sure we connect ok and to the correct account. nc := natsConnect(t, s.ClientURL()) + defer nc.Close() resp, err := nc.Request(userDirectInfoSubj, nil, time.Second) require_NoError(t, err) response := ServerAPIResponse{Data: &UserInfo{}} diff --git a/server/client.go b/server/client.go index 41771a575a6..87fc410a32e 100644 --- a/server/client.go +++ b/server/client.go @@ -4831,17 +4831,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { if (src == LEAF || src == CLIENT) && dst == LEAF { + // Remember that leaf in case we don't find any other candidate. if rsub == nil { rsub = sub } continue } else { - c.addSubToRouteTargets(sub) - // Clear rsub since we added a sub. - rsub = nil - if flags&pmrCollectQueueNames != 0 { - queues = append(queues, sub.queue) + // We would be picking a route, but if we had remembered a "hub" leaf, + // then pick that one instead of the route. + if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() { + break } + rsub = sub } break } @@ -4923,8 +4924,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, } if rsub != nil { - // If we are here we tried to deliver to a local qsub - // but failed. So we will send it to a remote or leaf node. + // We are here if we have selected a leaf or route as the destination, + // or if we tried to deliver to a local qsub but failed. c.addSubToRouteTargets(rsub) if flags&pmrCollectQueueNames != 0 { queues = append(queues, rsub.queue) diff --git a/server/client_test.go b/server/client_test.go index 9192b281dbc..45419ce5ae5 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) { } nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...) if expectedOk { + defer nc.Close() if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -3072,8 +3073,9 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { wait := make(chan error) - nca, err := nats.Connect(proxy.clientURL()) + nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose()) require_NoError(t, err) + defer nca.Close() nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) { wait <- err close(wait) @@ -3081,6 +3083,7 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) { ncb, err := nats.Connect(s.ClientURL()) require_NoError(t, err) + defer ncb.Close() _, err = nca.Subscribe("test", func(msg *nats.Msg) { wait <- nil diff --git a/server/gateway_test.go b/server/gateway_test.go index 4c8eb774c68..b397bfedc10 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) { defer s.Shutdown() l := &captureFatalLogger{fatalCh: make(chan string, 1)} s.SetLogger(l, false, false) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - s.Start() - wg.Done() - }() + // This does not block + s.Start() select { case e := <-l.fatalCh: if !strings.Contains(e, errTxt) { @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) { t.Fatal("Should have got a fatal error") } s.Shutdown() - wg.Wait() + s.WaitForShutdown() } func TestGatewayListenError(t *testing.T) { diff --git a/server/jwt_test.go b/server/jwt_test.go index d45b1dd53e1..f4235a2b973 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) { for range time.NewTicker(200 * time.Millisecond).C { select { case <-ctx.Done(): + return default: } send(t) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index eeba7737d95..698066c03db 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2314,39 +2314,30 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { ncSrv1 := natsConnect(t, srv1.ClientURL()) defer ncSrv1.Close() natsQueueSub(t, ncSrv1, "foo", "queue", func(m *nats.Msg) { - m.Respond([]byte("from srv1")) + m.Data = []byte("from srv1") + m.RespondMsg(m) }) ncLeaf1 := natsConnect(t, leaf1.ClientURL()) defer ncLeaf1.Close() natsQueueSub(t, ncLeaf1, "foo", "queue", func(m *nats.Msg) { - m.Respond([]byte("from leaf1")) + m.Data = []byte("from leaf1") + m.RespondMsg(m) }) ncLeaf2 := natsConnect(t, leaf2.ClientURL()) defer ncLeaf2.Close() // Check that "foo" interest is available everywhere. - // For this test, we want to make sure that the 2 queue subs are - // registered on all servers, so we don't use checkSubInterest - // which would simply return "true" if there is any interest on "foo". - servers := []*Server{srv1, leaf1, leaf2} - checkFor(t, time.Second, 15*time.Millisecond, func() error { - for _, s := range servers { - acc, err := s.LookupAccount(globalAccountName) - if err != nil { - return err - } - acc.mu.RLock() - r := acc.sl.Match("foo") - ok := len(r.qsubs) == 1 && len(r.qsubs[0]) == 2 - acc.mu.RUnlock() - if !ok { - return fmt.Errorf("interest not propagated on %q", s.Name()) + for _, s := range []*Server{srv1, leaf1, leaf2} { + gacc := s.GlobalAccount() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != 2 { + return fmt.Errorf("Expected interest for %q to be 2, got %v", "foo", n) } - } - return nil - }) + return nil + }) + } // Send requests (from leaf2). For this test to make sure that // there is no duplicate, we want to make sure that we check for @@ -2360,15 +2351,22 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { checkSubInterest(t, leaf1, globalAccountName, "reply_subj", time.Second) checkSubInterest(t, leaf2, globalAccountName, "reply_subj", time.Second) - for i := 0; i < 5; i++ { + for i := 0; i < 100; i++ { // Now send the request - natsPubReq(t, ncLeaf2, "foo", sub.Subject, []byte("req")) + reqID := fmt.Sprintf("req.%d", i) + msg := nats.NewMsg("foo") + msg.Data = []byte("req") + msg.Header.Set("ReqId", reqID) + msg.Reply = sub.Subject + if err := ncLeaf2.PublishMsg(msg); err != nil { + t.Fatalf("Error on publish: %v", err) + } // Check that we get the reply replyMsg := natsNexMsg(t, sub, time.Second) - // But make sure we received only 1! - if otherReply, _ := sub.NextMsg(100 * time.Millisecond); otherReply != nil { - t.Fatalf("Received duplicate reply, first was %q, followed by %q", - replyMsg.Data, otherReply.Data) + // But make sure no duplicate. We do so by checking that the reply's + // header ReqId matches our current reqID. + if respReqID := replyMsg.Header.Get("ReqId"); respReqID != reqID { + t.Fatalf("Current request is %q, got duplicate with %q", reqID, respReqID) } // We also should have preferred the queue sub that is in the leaf cluster. if string(replyMsg.Data) != "from leaf1" { @@ -3044,6 +3042,7 @@ func TestLeafNodeWSSubPath(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { attempts <- r.URL.String() })) + defer ts.Close() u, _ := url.Parse(fmt.Sprintf("%v/some/path", ts.URL)) u.Scheme = "ws" lo2.LeafNode.Remotes = []*RemoteLeafOpts{ @@ -4010,6 +4009,114 @@ func TestLeafNodeInterestPropagationDaisychain(t *testing.T) { checkSubInterest(t, sAA, "$G", "foo", time.Second) // failure issue 2448 } +func TestLeafNodeQueueGroupDistribution(t *testing.T) { + hc := createClusterWithName(t, "HUB", 3) + defer hc.shutdown() + + // Now have a cluster of leafnodes with each one connecting to corresponding HUB(n) node. + c1 := ` + server_name: LEAF1 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port))) + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + server_name: LEAF2 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[1].LeafNode.Port))) + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + c3 := ` + server_name: LEAF3 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port))) + ln3, _ := RunServerWithConfig(lconf3) + defer ln3.Shutdown() + + // Check leaf cluster is formed and all connected to the HUB. + lnServers := []*Server{ln1, ln2, ln3} + checkClusterFormed(t, lnServers...) + for _, s := range lnServers { + checkLeafNodeConnected(t, s) + } + // Check each node in the hub has 1 connection from the leaf cluster. + for i := 0; i < 3; i++ { + checkLeafNodeConnectedCount(t, hc.servers[i], 1) + } + + // Create a client and qsub on LEAF1 and LEAF2. + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + var qsub1Count atomic.Int32 + natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) { + qsub1Count.Add(1) + }) + natsFlush(t, nc1) + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + var qsub2Count atomic.Int32 + natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) { + qsub2Count.Add(1) + }) + natsFlush(t, nc2) + + // Make sure that the propagation interest is done before sending. + for _, s := range hc.servers { + gacc := s.GlobalAccount() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != 2 { + return fmt.Errorf("Expected interest for %q to be 2, got %v", "foo", n) + } + return nil + }) + } + + sendAndCheck := func(idx int) { + t.Helper() + nchub := natsConnect(t, hc.servers[idx].ClientURL()) + defer nchub.Close() + total := 1000 + for i := 0; i < total; i++ { + natsPub(t, nchub, "foo", []byte("from hub")) + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total { + return fmt.Errorf("Expected %v messages, got %v", total, trecv) + } + return nil + }) + // Now that we have made sure that all messages were received, + // check that qsub1 and qsub2 are getting at least some. + if n := int(qsub1Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub1 to get some messages, but got %v", n) + } + if n := int(qsub2Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub2 to get some messages, but got %v", n) + } + // Reset the counters. + qsub1Count.Store(0) + qsub2Count.Store(0) + } + // Send from HUB1 + sendAndCheck(0) + // Send from HUB2 + sendAndCheck(1) + // Send from HUB3 + sendAndCheck(2) +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /* @@ -4667,6 +4774,7 @@ func TestLeafNodePermsSuppressSubs(t *testing.T) { // Connect client to the hub. nc, err := nats.Connect(s.ClientURL()) require_NoError(t, err) + defer nc.Close() // This should not be seen on leafnode side since we only allow pub to "foo" _, err = nc.SubscribeSync("baz") @@ -6745,6 +6853,7 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t // Now connect and send responses from EFG in cloud. nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("efg", "p")) + defer nc.Close() for i := 0; i < 100; i++ { require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) @@ -6877,6 +6986,7 @@ func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *tes // Now connect and send responses from EFG in cloud. nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("efg", "p")) + defer nc.Close() for i := 0; i < 100; i++ { require_NoError(t, nc.Publish("RESPONSE", []byte("OK"))) } @@ -7114,15 +7224,15 @@ func TestLeafNodeTwoRemotesToSameHubAccountWithClusters(t *testing.T) { nc := natsConnect(t, s.ClientURL(), nats.UserInfo(user, "pwd")) conns = append(conns, nc) } - for _, nc := range conns { - defer nc.Close() - } createConn(sh1, "HA") createConn(sh2, "HA") createConn(sp1, "A") createConn(sp2, "A") createConn(sp1, "B") createConn(sp2, "B") + for _, nc := range conns { + defer nc.Close() + } check := func(subConn *nats.Conn, subj string, checkA, checkB bool) { t.Helper() diff --git a/server/monitor_test.go b/server/monitor_test.go index a3c09136167..694f5ae4199 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4980,6 +4980,7 @@ func TestServerIDZRequest(t *testing.T) { nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) require_NoError(t, err) + defer nc.Close() subject := fmt.Sprintf(serverPingReqSubj, "IDZ") resp, err := nc.Request(subject, nil, time.Second) @@ -5453,9 +5454,17 @@ func TestHealthzStatusError(t *testing.T) { // Intentionally causing an error in readyForConnections(). // Note: Private field access, taking advantage of having the tests in the same package. + s.mu.Lock() + sl := s.listener s.listener = nil + s.mu.Unlock() checkHealthzEndpoint(t, s.MonitorAddr().String(), http.StatusInternalServerError, "error") + + // Restore for proper shutdown. + s.mu.Lock() + s.listener = sl + s.mu.Unlock() } func TestHealthzStatusUnavailable(t *testing.T) { diff --git a/server/raft_test.go b/server/raft_test.go index 4fef89d27fb..73e4da321d9 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1042,7 +1042,9 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { fn.processAppendEntry(ae, fn.aesub) require_Equal(t, fn.term, 20) // Follower should reject and the term stays the same. + fn.Lock() fn.resetWAL() + fn.Unlock() fn.processAppendEntry(ae, fn.aesub) require_Equal(t, fn.term, 20) // Follower should reject again, even after reset, term stays the same. } diff --git a/server/routes_test.go b/server/routes_test.go index 9aa9df55f84..fca2d96e8cb 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -4318,9 +4318,11 @@ func TestRouteSlowConsumerRecover(t *testing.T) { ncA, err := nats.Connect(s1.Addr().String()) require_NoError(t, err) + defer ncA.Close() ncB, err := nats.Connect(s2.Addr().String()) require_NoError(t, err) + defer ncB.Close() var wg sync.WaitGroup ncB.Subscribe("test", func(*nats.Msg) { diff --git a/server/server_test.go b/server/server_test.go index 037c40e837a..0f46b64216a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -239,6 +239,9 @@ func TestTLSMinVersionConfig(t *testing.T) { } opts = append(opts, nats.RootCAs("../test/configs/certs/ca.pem")) nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...) + if err == nil { + defer nc.Close() + } if expectedErr == nil { if err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/server/sublist.go b/server/sublist.go index d8c19880b3a..657666f1afb 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -640,7 +640,9 @@ func (s *Sublist) hasInterest(subject string, doLock bool, np, nq *int) bool { if r, ok := s.cache[subject]; ok { if np != nil && nq != nil { *np += len(r.psubs) - *nq += len(r.qsubs) + for _, qsub := range r.qsubs { + *nq += len(qsub) + } } matched = len(r.psubs)+len(r.qsubs) > 0 } @@ -798,7 +800,9 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { if l.fwc != nil { if np != nil && nq != nil { *np += len(l.fwc.psubs) - *nq += len(l.fwc.qsubs) + for _, qsub := range l.fwc.qsubs { + *nq += len(qsub) + } } return true } @@ -817,14 +821,18 @@ func matchLevelForAny(l *level, toks []string, np, nq *int) bool { if n != nil { if np != nil && nq != nil { *np += len(n.psubs) - *nq += len(n.qsubs) + for _, qsub := range n.qsubs { + *nq += len(qsub) + } } return len(n.plist) > 0 || len(n.psubs) > 0 || len(n.qsubs) > 0 } if pwc != nil { if np != nil && nq != nil { *np += len(pwc.psubs) - *nq += len(pwc.qsubs) + for _, qsub := range pwc.qsubs { + *nq += len(qsub) + } } return len(pwc.plist) > 0 || len(pwc.psubs) > 0 || len(pwc.qsubs) > 0 } diff --git a/server/sublist_test.go b/server/sublist_test.go index fcaffaa3327..2bc3a9958b8 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1863,13 +1863,24 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo", 0, 2) require_NumInterest(t, "foo.bar", 0, 0) + // Add a second qsub to the second queue group + qsub3 := newQSub("foo", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 3) + require_NumInterest(t, "foo.bar", 0, 0) + // Remove first queue sl.Remove(qsub) + require_NumInterest(t, "foo", 0, 2) + require_NumInterest(t, "foo.bar", 0, 0) + + // Remove second + sl.Remove(qsub2) require_NumInterest(t, "foo", 0, 1) require_NumInterest(t, "foo.bar", 0, 0) // Remove last. - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) @@ -1886,18 +1897,32 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 2) require_NumInterest(t, "foo.bar.baz", 0, 0) + + qsub3 = newQSub("foo.*", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 3) + require_NumInterest(t, "foo.bar.baz", 0, 0) + // Remove first queue sl.Remove(qsub) require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 0) + + // Remove second + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 1) require_NumInterest(t, "foo.bar.baz", 0, 0) // Remove last - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) require_NumInterest(t, "foo.bar.baz", 0, 0) + // With > wildcard qsub = newQSub("foo.>", "bar") sl.Insert(qsub) require_NumInterest(t, "foo", 0, 0) @@ -1911,14 +1936,27 @@ func TestSublistNumInterest(t *testing.T) { require_NumInterest(t, "foo.bar", 0, 2) require_NumInterest(t, "foo.bar.baz", 0, 2) + // Add another queue to second group. + qsub3 = newQSub("foo.>", "baz") + sl.Insert(qsub3) + require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 3) + require_NumInterest(t, "foo.bar.baz", 0, 3) + // Remove first queue sl.Remove(qsub) require_NumInterest(t, "foo", 0, 0) + require_NumInterest(t, "foo.bar", 0, 2) + require_NumInterest(t, "foo.bar.baz", 0, 2) + + // Remove second + sl.Remove(qsub2) + require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 1) require_NumInterest(t, "foo.bar.baz", 0, 1) // Remove last - sl.Remove(qsub2) + sl.Remove(qsub3) require_NumInterest(t, "foo", 0, 0) require_NumInterest(t, "foo.bar", 0, 0) require_NumInterest(t, "foo.bar.baz", 0, 0)