Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] LeafNode's queue group load balancing and Sublist.NumInterest #5982

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -373,13 +375,13 @@ func TestAuthCalloutAllowedAccounts(t *testing.T) {
t.Helper()

var nc *nats.Conn
defer nc.Close()
// Assume no auth user.
if password == "" {
nc = at.Connect()
} else {
nc = at.Connect(nats.UserInfo(user, password))
}
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -470,6 +472,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -521,6 +524,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) {
require_NoError(t, err)

nc := ac.Connect(nkeyOpt)
defer nc.Close()

// Make sure that the callout was called.
if atomic.LoadUint32(&callouts) != 1 {
Expand Down Expand Up @@ -740,6 +744,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send correct token. This should switch us to the test account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -760,6 +765,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken))
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -779,6 +785,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1004,10 +1011,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

// Authorization services can optionally encrypt the responses using the server's public xkey.
ac.Connect(nats.UserInfo("dlc", "xxx"))
nc = ac.Connect(nats.UserInfo("dlc", "xxx"))
defer nc.Close()
}

func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
Expand Down Expand Up @@ -1099,10 +1108,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
defer removeFile(t, creds)

// This will receive an encrypted request to the auth service but send plaintext response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
defer nc.Close()

// This will receive an encrypted request to the auth service and send an encrypted response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
defer nc.Close()
}

func TestAuthCalloutServerTags(t *testing.T) {
Expand Down Expand Up @@ -1130,7 +1141,8 @@ func TestAuthCalloutServerTags(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

tags := <-tch
require_True(t, len(tags) == 2)
Expand Down Expand Up @@ -1163,7 +1175,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

cluster := <-ch
require_True(t, cluster == "HUB")
Expand Down Expand Up @@ -1266,7 +1279,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) {
require_NoError(t, err)

// This one will use callout since not defined in server config.
ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()
checkSubsPending(t, sub, 0)

checkAuthErrEvent := func(user, pass, reason string) {
Expand Down Expand Up @@ -1326,6 +1340,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) {

// Setup system user.
snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

// Allow this connect event to pass us by..
time.Sleep(250 * time.Millisecond)
Expand Down Expand Up @@ -1697,6 +1712,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {
// Send correct token. This should switch us to the A account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -1708,6 +1724,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {

nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1785,6 +1802,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1992,6 +2010,7 @@ func TestOperatorModeUserRevocation(t *testing.T) {
// connect the system user
sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds))
require_NoError(t, err)
defer sysNC.Close()

// Bearer token etc..
// This is used by all users, and the customization will be in other connect args.
Expand Down
1 change: 1 addition & 0 deletions server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) {

// Make sure we connect ok and to the correct account.
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down
15 changes: 8 additions & 7 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4831,17 +4831,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
}
continue
} else {
c.addSubToRouteTargets(sub)
// Clear rsub since we added a sub.
rsub = nil
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, sub.queue)
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
}
rsub = sub
}
break
}
Expand Down Expand Up @@ -4923,8 +4924,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote or leaf node.
// We are here if we have selected a leaf or route as the destination,
// or if we tried to deliver to a local qsub but failed.
c.addSubToRouteTargets(rsub)
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, rsub.queue)
Expand Down
5 changes: 4 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) {
}
nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...)
if expectedOk {
defer nc.Close()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -3072,15 +3073,17 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {

wait := make(chan error)

nca, err := nats.Connect(proxy.clientURL())
nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose())
require_NoError(t, err)
defer nca.Close()
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
wait <- err
close(wait)
})

ncb, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer ncb.Close()

_, err = nca.Subscribe("test", func(msg *nats.Msg) {
wait <- nil
Expand Down
10 changes: 3 additions & 7 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
defer s.Shutdown()
l := &captureFatalLogger{fatalCh: make(chan string, 1)}
s.SetLogger(l, false, false)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
// This does not block
s.Start()
select {
case e := <-l.fatalCh:
if !strings.Contains(e, errTxt) {
Expand All @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
t.Fatal("Should have got a fatal error")
}
s.Shutdown()
wg.Wait()
s.WaitForShutdown()
}

func TestGatewayListenError(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) {
for range time.NewTicker(200 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
send(t)
Expand Down
Loading