Skip to content

Commit

Permalink
[FIXED] LeafNode's queue group load balancing and Sublist.NumInterest (
Browse files Browse the repository at this point in the history
…#5982)

While writing the test, I needed to make sure that each server in
the hub has registered interest for 2 queue subscribers from the
same group. I noticed that `Sublist.NumInterest()` (that I was
invoking from `Account.Interest()` was returning 1, even after
I knew that the propagation should have happened. It turns out
that `NumInterest()` was returning the number of queue groups, not
the number of queue subs in all those queue groups.
    
For the leafnode queue balancing issue, the code was favoring
local/routed queue subscriptions, so in the described issue,
the message would always go from HUB1->HUB2->LEAF2->QSub instead
of HUB1->LEAF1->QSub.
    
Since we had another test that was a bit reversed where we had
a HUB and LEAF1<->LEAF2 connecting to HUB and a qsub on both
HUB and LEAF1 and requests originated from LEAF2, and we were
expecting all responses to come from LEAF1 (instead of the
responder on HUB), I went with the following approach:
    
If the message originates from a client that connects to a server
that has a connection from a remote LEAF, then we pick that LEAF the
same as if it was a local client or routed server.
However, if the client connects to a server that has a leaf
connection to another server, then we keep track of the sub
but do not sent to that one if we have local or routed qsubs.
    
This makes the 2 tests pass, solving the new test and maintaining
the behavior for the old test.

Resolves #5972 

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
derekcollison authored Oct 10, 2024
2 parents 8351e43 + 0651c56 commit 7e9c93f
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 60 deletions.
35 changes: 27 additions & 8 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -373,13 +375,13 @@ func TestAuthCalloutAllowedAccounts(t *testing.T) {
t.Helper()

var nc *nats.Conn
defer nc.Close()
// Assume no auth user.
if password == "" {
nc = at.Connect()
} else {
nc = at.Connect(nats.UserInfo(user, password))
}
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -470,6 +472,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -521,6 +524,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) {
require_NoError(t, err)

nc := ac.Connect(nkeyOpt)
defer nc.Close()

// Make sure that the callout was called.
if atomic.LoadUint32(&callouts) != 1 {
Expand Down Expand Up @@ -740,6 +744,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send correct token. This should switch us to the test account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -760,6 +765,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken))
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -779,6 +785,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1004,10 +1011,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

// Authorization services can optionally encrypt the responses using the server's public xkey.
ac.Connect(nats.UserInfo("dlc", "xxx"))
nc = ac.Connect(nats.UserInfo("dlc", "xxx"))
defer nc.Close()
}

func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
Expand Down Expand Up @@ -1099,10 +1108,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
defer removeFile(t, creds)

// This will receive an encrypted request to the auth service but send plaintext response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
defer nc.Close()

// This will receive an encrypted request to the auth service and send an encrypted response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
defer nc.Close()
}

func TestAuthCalloutServerTags(t *testing.T) {
Expand Down Expand Up @@ -1130,7 +1141,8 @@ func TestAuthCalloutServerTags(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

tags := <-tch
require_True(t, len(tags) == 2)
Expand Down Expand Up @@ -1163,7 +1175,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

cluster := <-ch
require_True(t, cluster == "HUB")
Expand Down Expand Up @@ -1266,7 +1279,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) {
require_NoError(t, err)

// This one will use callout since not defined in server config.
ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()
checkSubsPending(t, sub, 0)

checkAuthErrEvent := func(user, pass, reason string) {
Expand Down Expand Up @@ -1326,6 +1340,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) {

// Setup system user.
snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

// Allow this connect event to pass us by..
time.Sleep(250 * time.Millisecond)
Expand Down Expand Up @@ -1697,6 +1712,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {
// Send correct token. This should switch us to the A account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -1708,6 +1724,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {

nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1785,6 +1802,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1992,6 +2010,7 @@ func TestOperatorModeUserRevocation(t *testing.T) {
// connect the system user
sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds))
require_NoError(t, err)
defer sysNC.Close()

// Bearer token etc..
// This is used by all users, and the customization will be in other connect args.
Expand Down
1 change: 1 addition & 0 deletions server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) {

// Make sure we connect ok and to the correct account.
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down
15 changes: 8 additions & 7 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4831,17 +4831,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
}
continue
} else {
c.addSubToRouteTargets(sub)
// Clear rsub since we added a sub.
rsub = nil
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, sub.queue)
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
}
rsub = sub
}
break
}
Expand Down Expand Up @@ -4923,8 +4924,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote or leaf node.
// We are here if we have selected a leaf or route as the destination,
// or if we tried to deliver to a local qsub but failed.
c.addSubToRouteTargets(rsub)
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, rsub.queue)
Expand Down
5 changes: 4 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) {
}
nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...)
if expectedOk {
defer nc.Close()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -3072,15 +3073,17 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {

wait := make(chan error)

nca, err := nats.Connect(proxy.clientURL())
nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose())
require_NoError(t, err)
defer nca.Close()
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
wait <- err
close(wait)
})

ncb, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer ncb.Close()

_, err = nca.Subscribe("test", func(msg *nats.Msg) {
wait <- nil
Expand Down
10 changes: 3 additions & 7 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
defer s.Shutdown()
l := &captureFatalLogger{fatalCh: make(chan string, 1)}
s.SetLogger(l, false, false)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
// This does not block
s.Start()
select {
case e := <-l.fatalCh:
if !strings.Contains(e, errTxt) {
Expand All @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
t.Fatal("Should have got a fatal error")
}
s.Shutdown()
wg.Wait()
s.WaitForShutdown()
}

func TestGatewayListenError(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) {
for range time.NewTicker(200 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
send(t)
Expand Down
Loading

0 comments on commit 7e9c93f

Please sign in to comment.