Skip to content

Commit

Permalink
Follow up on PR #4701 (#4708)
Browse files Browse the repository at this point in the history
Need to handle case of no pooling or pool size of 1.

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
derekcollison authored Oct 25, 2023
2 parents 9671a1f + d2b4bc3 commit 4bad329
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 68 deletions.
12 changes: 10 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5232,7 +5232,7 @@ func (c *client) reconnect() {

// Check for a solicited route. If it was, start up a reconnect unless
// we are already connected to the other end.
if c.isSolicitedRoute() || retryImplicit {
if didSolicit := c.isSolicitedRoute(); didSolicit || retryImplicit {
srv.mu.Lock()
defer srv.mu.Unlock()

Expand All @@ -5242,7 +5242,7 @@ func (c *client) reconnect() {
rtype := c.route.routeType
rurl := c.route.url
accName := string(c.route.accName)
checkRID := accName == _EMPTY_ && srv.routesPoolSize <= 1 && rid != _EMPTY_
checkRID := accName == _EMPTY_ && srv.getOpts().Cluster.PoolSize < 1 && rid != _EMPTY_
c.mu.Unlock()

// It is possible that the server is being shutdown.
Expand All @@ -5252,6 +5252,14 @@ func (c *client) reconnect() {
}

if checkRID && srv.routes[rid] != nil {
// This is the case of "no pool". Make sure that the registered one
// is upgraded to solicited if the connection trying to reconnect
// was a solicited one.
if didSolicit {
if remote := srv.routes[rid][0]; remote != nil {
upgradeRouteToSolicited(remote, rurl, rtype)
}
}
srv.Debugf("Not attempting reconnect for solicited route, already connected to %q", rid)
return
} else if rid == srv.info.ID {
Expand Down
52 changes: 41 additions & 11 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,11 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
s.mu.Unlock()
return false
}
// Look if there is a solicited route in the pool. If there is one,
// they should all be, so stop at the first.
if url, rtype, hasSolicited := hasSolicitedRoute(conns); hasSolicited {
upgradeRouteToSolicited(c, url, rtype)
}
} else {
// If we solicit, upgrade to solicited all non-solicited routes that
// we may have registered.
Expand All @@ -2027,17 +2032,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
rtype := c.route.routeType
c.mu.Unlock()
for _, r := range conns {
if r != nil {
r.mu.Lock()
if !r.route.didSolicit {
r.route.didSolicit = true
r.route.url = url
}
if rtype == Explicit {
r.route.routeType = Explicit
}
r.mu.Unlock()
}
upgradeRouteToSolicited(r, url, rtype)
}
}
// For all cases (solicited and not) we need to count how many connections
Expand Down Expand Up @@ -2164,6 +2159,41 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
return !exists
}

func hasSolicitedRoute(conns []*client) (*url.URL, RouteType, bool) {
var url *url.URL
var rtype RouteType
for _, r := range conns {
if r == nil {
continue
}
r.mu.Lock()
if r.route.didSolicit {
url = r.route.url
rtype = r.route.routeType
}
r.mu.Unlock()
if url != nil {
return url, rtype, true
}
}
return nil, 0, false
}

func upgradeRouteToSolicited(r *client, url *url.URL, rtype RouteType) {
if r == nil {
return
}
r.mu.Lock()
if !r.route.didSolicit {
r.route.didSolicit = true
r.route.url = url
}
if rtype == Explicit {
r.route.routeType = Explicit
}
r.mu.Unlock()
}

func handleDuplicateRoute(remote, c *client, setNoReconnect bool) {
// We used to clear some fields when closing a duplicate connection
// to prevent sending INFO protocols for the remotes to update
Expand Down
121 changes: 66 additions & 55 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,71 +2079,82 @@ func TestRoutePool(t *testing.T) {
}

func TestRoutePoolConnectRace(t *testing.T) {
// This test will have each server point to each other and that is causing
// each one to attempt to connect routes to each other which should lead
// to connections needing to be dropped. We make sure that there is still
// resolution and there is the expected number of routes.
createSrv := func(name string, port int) *Server {
o := DefaultOptions()
o.Port = -1
o.ServerName = name
o.Cluster.PoolSize = 5
o.Cluster.Name = "local"
o.Cluster.Port = port
o.Routes = RoutesFromStr("nats://127.0.0.1:1234,nats://127.0.0.1:1235,nats://127.0.0.1:1236")
s, err := NewServer(o)
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
return s
}
s1 := createSrv("A", 1234)
s2 := createSrv("B", 1235)
s3 := createSrv("C", 1236)
for _, test := range []struct {
name string
poolSize int
}{
{"no pool", -1},
{"pool size 1", 1},
{"pool size 5", 5},
} {
t.Run(test.name, func(t *testing.T) {
// This test will have each server point to each other and that is causing
// each one to attempt to connect routes to each other which should lead
// to connections needing to be dropped. We make sure that there is still
// resolution and there is the expected number of routes.
createSrv := func(name string, port int) *Server {
o := DefaultOptions()
o.Port = -1
o.ServerName = name
o.Cluster.PoolSize = test.poolSize
o.Cluster.Name = "local"
o.Cluster.Port = port
o.Routes = RoutesFromStr("nats://127.0.0.1:1234,nats://127.0.0.1:1235,nats://127.0.0.1:1236")
s, err := NewServer(o)
if err != nil {
t.Fatalf("Error creating server: %v", err)
}
return s
}
s1 := createSrv("A", 1234)
s2 := createSrv("B", 1235)
s3 := createSrv("C", 1236)

l := &captureDebugLogger{dbgCh: make(chan string, 100)}
s1.SetLogger(l, true, false)
l := &captureDebugLogger{dbgCh: make(chan string, 100)}
s1.SetLogger(l, true, false)

servers := []*Server{s1, s2, s3}
servers := []*Server{s1, s2, s3}

for _, s := range servers {
go s.Start()
defer s.Shutdown()
}
for _, s := range servers {
go s.Start()
defer s.Shutdown()
}

checkClusterFormed(t, s1, s2, s3)
checkClusterFormed(t, s1, s2, s3)

for done, duplicate := false, 0; !done; {
select {
case e := <-l.dbgCh:
if strings.Contains(e, "duplicate") {
if duplicate++; duplicate > 20 {
t.Fatalf("Routes are constantly reconnecting: %v", e)
for done, duplicate := false, 0; !done; {
select {
case e := <-l.dbgCh:
if strings.Contains(e, "duplicate") {
if duplicate++; duplicate > 20 {
t.Fatalf("Routes are constantly reconnecting: %v", e)
}
}
case <-time.After(DEFAULT_ROUTE_RECONNECT + 250*time.Millisecond):
// More than reconnect and some, and no reconnect, so we are good.
done = true
}
}
case <-time.After(DEFAULT_ROUTE_RECONNECT + 250*time.Millisecond):
// More than reconnect and some, and no reconnect, so we are good.
done = true
}
}

// Also, check that they all report as solicited and configured in monitoring.
for _, s := range servers {
routes, err := s.Routez(nil)
require_NoError(t, err)
for _, r := range routes.Routes {
if !r.DidSolicit {
t.Fatalf("All routes should have been marked as solicited, this one was not: %+v", r)
}
if !r.IsConfigured {
t.Fatalf("All routes should have been marked as configured, this one was not: %+v", r)
// Also, check that they all report as solicited and configured in monitoring.
for _, s := range servers {
routes, err := s.Routez(nil)
require_NoError(t, err)
for _, r := range routes.Routes {
if !r.DidSolicit {
t.Fatalf("All routes should have been marked as solicited, this one was not: %+v", r)
}
if !r.IsConfigured {
t.Fatalf("All routes should have been marked as configured, this one was not: %+v", r)
}
}
}
}
}

for _, s := range servers {
s.Shutdown()
s.WaitForShutdown()
for _, s := range servers {
s.Shutdown()
s.WaitForShutdown()
}
})
}
}

Expand Down

0 comments on commit 4bad329

Please sign in to comment.