Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.22-RC.2 #5984

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ language: go
go:
# This should be quoted or use .x, but should not be unquoted.
# Remember that a YAML bare float drops trailing zeroes.
- "1.22.7"
- "1.22.8"
- "1.21.13"

go_import_path: github.com/nats-io/nats-server
Expand Down
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,8 @@ func (a *Account) Interest(subject string) int {
var nms int
a.mu.RLock()
if a.sl != nil {
res := a.sl.Match(subject)
nms = len(res.psubs) + len(res.qsubs)
np, nq := a.sl.NumInterest(subject)
nms = np + nq
}
a.mu.RUnlock()
return nms
Expand Down
33 changes: 26 additions & 7 deletions server/auth_callout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func TestAuthCalloutBasics(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestAuthCalloutMultiAccounts(t *testing.T) {

// This one will use callout since not defined in server config.
nc := at.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -388,6 +390,7 @@ func TestAuthCalloutClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -439,6 +442,7 @@ func TestAuthCalloutVerifiedUserCalloutsWithSig(t *testing.T) {
require_NoError(t, err)

nc := ac.Connect(nkeyOpt)
defer nc.Close()

// Make sure that the callout was called.
if atomic.LoadUint32(&callouts) != 1 {
Expand Down Expand Up @@ -658,6 +662,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send correct token. This should switch us to the test account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(secretToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -678,6 +683,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// Send the signing key token. This should switch us to the test account, but the user
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(skKeyToken))
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -697,6 +703,7 @@ func TestAuthCalloutOperatorModeBasics(t *testing.T) {
// is signed with the account signing key
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(scopedToken))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -922,10 +929,12 @@ func TestAuthCalloutServerConfigEncryption(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()

// Authorization services can optionally encrypt the responses using the server's public xkey.
ac.Connect(nats.UserInfo("dlc", "xxx"))
nc = ac.Connect(nats.UserInfo("dlc", "xxx"))
defer nc.Close()
}

func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
Expand Down Expand Up @@ -1017,10 +1026,12 @@ func TestAuthCalloutOperatorModeEncryption(t *testing.T) {
defer removeFile(t, creds)

// This will receive an encrypted request to the auth service but send plaintext response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
nc := ac.Connect(nats.UserCredentials(creds), nats.Token(tokenA))
defer nc.Close()

// This will receive an encrypted request to the auth service and send an encrypted response.
ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
nc = ac.Connect(nats.UserCredentials(creds), nats.Token(tokenB))
defer nc.Close()
}

func TestAuthCalloutServerTags(t *testing.T) {
Expand Down Expand Up @@ -1048,7 +1059,8 @@ func TestAuthCalloutServerTags(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

tags := <-tch
require_True(t, len(tags) == 2)
Expand Down Expand Up @@ -1081,7 +1093,8 @@ func TestAuthCalloutServerClusterAndVersion(t *testing.T) {
ac := NewAuthTest(t, conf, handler, nats.UserInfo("auth", "pwd"))
defer ac.Cleanup()

ac.Connect()
nc := ac.Connect()
defer nc.Close()

cluster := <-ch
require_True(t, cluster == "HUB")
Expand Down Expand Up @@ -1184,7 +1197,8 @@ func TestAuthCalloutAuthErrEvents(t *testing.T) {
require_NoError(t, err)

// This one will use callout since not defined in server config.
ac.Connect(nats.UserInfo("dlc", "zzz"))
nc := ac.Connect(nats.UserInfo("dlc", "zzz"))
defer nc.Close()
checkSubsPending(t, sub, 0)

checkAuthErrEvent := func(user, pass, reason string) {
Expand Down Expand Up @@ -1244,6 +1258,7 @@ func TestAuthCalloutConnectEvents(t *testing.T) {

// Setup system user.
snc := ac.Connect(nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

// Allow this connect event to pass us by..
time.Sleep(250 * time.Millisecond)
Expand Down Expand Up @@ -1615,6 +1630,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {
// Send correct token. This should switch us to the A account.
nc := ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInA"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand All @@ -1626,6 +1642,7 @@ func TestAuthCalloutOperator_AnyAccount(t *testing.T) {

nc = ac.Connect(nats.UserCredentials(creds), nats.Token("PutMeInB"))
require_NoError(t, err)
defer nc.Close()

resp, err = nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1703,6 +1720,7 @@ func TestAuthCalloutWSClientTLSCerts(t *testing.T) {
nats.ClientCert("../test/configs/certs/tlsauth/client2.pem", "../test/configs/certs/tlsauth/client2-key.pem"),
nats.RootCAs("../test/configs/certs/tlsauth/ca.pem"),
)
defer nc.Close()

resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
Expand Down Expand Up @@ -1910,6 +1928,7 @@ func TestOperatorModeUserRevocation(t *testing.T) {
// connect the system user
sysNC, err := ac.NewClient(nats.UserCredentials(sysCreds))
require_NoError(t, err)
defer sysNC.Close()

// Bearer token etc..
// This is used by all users, and the customization will be in other connect args.
Expand Down
1 change: 1 addition & 0 deletions server/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func TestNoAuthUserNkey(t *testing.T) {

// Make sure we connect ok and to the correct account.
nc := natsConnect(t, s.ClientURL())
defer nc.Close()
resp, err := nc.Request(userDirectInfoSubj, nil, time.Second)
require_NoError(t, err)
response := ServerAPIResponse{Data: &UserInfo{}}
Expand Down
25 changes: 13 additions & 12 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3189,7 +3189,7 @@ func (c *client) processUnsub(arg []byte) error {
func (c *client) checkDenySub(subject string) bool {
if denied, ok := c.mperms.dcache[subject]; ok {
return denied
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
} else if np, _ := c.mperms.deny.NumInterest(subject); np != 0 {
c.mperms.dcache[subject] = true
return true
} else {
Expand Down Expand Up @@ -3711,13 +3711,13 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck, hasLock bool) bo
allowed := true
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
allowed = len(r.psubs) != 0
np, _ := c.perms.pub.allow.NumInterest(subject)
allowed = np != 0
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && c.perms.pub.deny != nil {
r := c.perms.pub.deny.Match(subject)
allowed = len(r.psubs) == 0
np, _ := c.perms.pub.deny.NumInterest(subject)
allowed = np == 0
}

// If we are currently not allowed but we are tracking reply subjects
Expand Down Expand Up @@ -4649,17 +4649,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
}
continue
} else {
c.addSubToRouteTargets(sub)
// Clear rsub since we added a sub.
rsub = nil
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, sub.queue)
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
}
rsub = sub
}
break
}
Expand Down Expand Up @@ -4708,8 +4709,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote or leaf node.
// We are here if we have selected a leaf or route as the destination,
// or if we tried to deliver to a local qsub but failed.
c.addSubToRouteTargets(rsub)
if flags&pmrCollectQueueNames != 0 {
queues = append(queues, rsub.queue)
Expand Down
5 changes: 4 additions & 1 deletion server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,7 @@ func TestTLSClientHandshakeFirst(t *testing.T) {
}
nc, err := nats.Connect(fmt.Sprintf("tls://localhost:%d", o.Port), opts...)
if expectedOk {
defer nc.Close()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -2979,15 +2980,17 @@ func TestClientFlushOutboundNoSlowConsumer(t *testing.T) {

wait := make(chan error)

nca, err := nats.Connect(proxy.clientURL())
nca, err := nats.Connect(proxy.clientURL(), nats.NoCallbacksAfterClientClose())
require_NoError(t, err)
defer nca.Close()
nca.SetDisconnectErrHandler(func(c *nats.Conn, err error) {
wait <- err
close(wait)
})

ncb, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer ncb.Close()

_, err = nca.Subscribe("test", func(msg *nats.Msg) {
wait <- nil
Expand Down
10 changes: 3 additions & 7 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,8 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
defer s.Shutdown()
l := &captureFatalLogger{fatalCh: make(chan string, 1)}
s.SetLogger(l, false, false)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
// This does not block
s.Start()
select {
case e := <-l.fatalCh:
if !strings.Contains(e, errTxt) {
Expand All @@ -819,7 +815,7 @@ func testFatalErrorOnStart(t *testing.T, o *Options, errTxt string) {
t.Fatal("Should have got a fatal error")
}
s.Shutdown()
wg.Wait()
s.WaitForShutdown()
}

func TestGatewayListenError(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7026,6 +7026,7 @@ func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) {
for range time.NewTicker(200 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}
send(t)
Expand Down
Loading