Skip to content

Commit

Permalink
p2p: refactor MaxPendingPeers handling
Browse files Browse the repository at this point in the history
* use semaphore instead of a chan struct{}
* move MaxPendingPeers default value to DefaultConfig.P2P
* log Error if Accept fails
* replace quit channel with context
  • Loading branch information
battlmonstr committed Apr 27, 2022
1 parent c9b26c2 commit ea6074f
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ var (
}
MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
Usage: "Maximum number of TCP connections pending to become connected peers",
Value: node.DefaultConfig.P2P.MaxPendingPeers,
}
ListenPortFlag = cli.IntFlag{
Expand Down
9 changes: 5 additions & 4 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ var DefaultConfig = Config{
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{
ListenAddr: ":30303",
ListenAddr65: ":30304",
MaxPeers: 100,
NAT: nat.Any(),
ListenAddr: ":30303",
ListenAddr65: ":30304",
MaxPeers: 100,
MaxPendingPeers: 50,
NAT: nat.Any(),
},
}
3 changes: 0 additions & 3 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ type dialConfig struct {
}

func (cfg dialConfig) withDefaults() dialConfig {
if cfg.maxActiveDials == 0 {
cfg.maxActiveDials = defaultMaxPendingPeers
}
if cfg.log == nil {
cfg.log = log.Root()
}
Expand Down
82 changes: 46 additions & 36 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"golang.org/x/sync/semaphore"
"net"
"sort"
"sync"
Expand Down Expand Up @@ -52,8 +53,7 @@ const (
discmixTimeout = 5 * time.Second

// Connectivity defaults.
defaultMaxPendingPeers = 50
defaultDialRatio = 3
defaultDialRatio = 3

// This time limits inbound connection attempts per source IP.
inboundThrottleTime = 30 * time.Second
Expand All @@ -79,7 +79,7 @@ type Config struct {

// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
// It must be greater than zero.
MaxPendingPeers int `toml:",omitempty"`

// DialRatio controls the ratio of inbound to dialed connections.
Expand Down Expand Up @@ -191,7 +191,9 @@ type Server struct {
dialsched *dialScheduler

// Channels into the run loop.
quit chan struct{}
quitCtx context.Context
quitFunc context.CancelFunc
quit <-chan struct{}
addtrusted chan *enode.Node
removetrusted chan *enode.Node
peerOp chan peerOpFunc
Expand Down Expand Up @@ -409,10 +411,10 @@ func (srv *Server) Stop() {
return
}
srv.running = false
close(srv.quit)
srv.quitFunc()
if srv.listener != nil {
// this unblocks listener Accept
srv.listener.Close()
_ = srv.listener.Close()
}
if srv.nodedb != nil {
srv.nodedb.Close()
Expand Down Expand Up @@ -476,13 +478,17 @@ func (srv *Server) Start(ctx context.Context) error {
if srv.PrivateKey == nil {
return errors.New("Server.PrivateKey must be set to a non-nil key")
}
if srv.MaxPendingPeers <= 0 {
return errors.New("MaxPendingPeers must be greater than zero")
}
if srv.newTransport == nil {
srv.newTransport = newRLPX
}
if srv.listenFunc == nil {
srv.listenFunc = net.Listen
}
srv.quit = make(chan struct{})
srv.quitCtx, srv.quitFunc = context.WithCancel(ctx)
srv.quit = srv.quitCtx.Done()
srv.delpeer = make(chan peerDrop)
srv.checkpointPostHandshake = make(chan *conn)
srv.checkpointAddPeer = make(chan *conn)
Expand All @@ -495,11 +501,11 @@ func (srv *Server) Start(ctx context.Context) error {
return err
}
if srv.ListenAddr != "" {
if err := srv.setupListening(); err != nil {
if err := srv.setupListening(srv.quitCtx); err != nil {
return err
}
}
if err := srv.setupDiscovery(ctx); err != nil {
if err := srv.setupDiscovery(srv.quitCtx); err != nil {
return err
}
srv.setupDialScheduler()
Expand Down Expand Up @@ -586,8 +592,8 @@ func (srv *Server) setupDiscovery(ctx context.Context) error {
srv.loopWG.Add(1)
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
srv.loopWG.Done()
}()
}
}
Expand Down Expand Up @@ -682,7 +688,7 @@ func (srv *Server) maxDialedConns() (limit int) {
return limit
}

func (srv *Server) setupListening() error {
func (srv *Server) setupListening(ctx context.Context) error {
// Launch the listener.
listener, err := srv.listenFunc("tcp", srv.ListenAddr)
if err != nil {
Expand All @@ -698,14 +704,18 @@ func (srv *Server) setupListening() error {
srv.loopWG.Add(1)
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
nat.Map(srv.NAT, srv.quit, "tcp", tcp.Port, tcp.Port, "ethereum p2p")
srv.loopWG.Done()
}()
}
}

srv.loopWG.Add(1)
go srv.listenLoop()
go func() {
defer debug.LogPanic()
defer srv.loopWG.Done()
srv.listenLoop(ctx)
}()
return nil
}

Expand Down Expand Up @@ -857,32 +867,26 @@ func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *

// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
defer debug.LogPanic()
func (srv *Server) listenLoop(ctx context.Context) {
srv.log.Trace("TCP listener up", "addr", srv.listener.Addr())

// The slots channel limits accepts of new connections.
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
// The slots limit accepts of new connections.
slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers))

// Wait for slots to be returned on exit. This ensures all connection goroutines
// are down before listenLoop returns.
defer srv.loopWG.Done()
defer func() {
for i := 0; i < cap(slots); i++ {
<-slots
}
_ = slots.Acquire(ctx, int64(srv.MaxPendingPeers))
}()

for {
// Wait for a free slot before accepting.
<-slots
if slotErr := slots.Acquire(ctx, 1); slotErr != nil {
if !errors.Is(slotErr, context.Canceled) {
srv.log.Error("Failed to get a peer connection slot", "err", slotErr)
}
return
}

var (
fd net.Conn
Expand All @@ -899,18 +903,23 @@ func (srv *Server) listenLoop() {
time.Sleep(time.Millisecond * 200)
continue
} else if err != nil {
srv.log.Trace("Read error", "err", err)
slots <- struct{}{}
// Log the error unless the server is shutting down.
select {
case <-srv.quit:
default:
srv.log.Error("Server listener failed to accept a connection", "err", err)
}
slots.Release(1)
return
}
break
}

remoteIP := netutil.AddrIP(fd.RemoteAddr())
if err := srv.checkInboundConn(fd, remoteIP); err != nil {
srv.log.Trace("Rejected inbound connnection", "addr", fd.RemoteAddr(), "err", err)
fd.Close()
slots <- struct{}{}
srv.log.Trace("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
_ = fd.Close()
slots.Release(1)
continue
}
if remoteIP != nil {
Expand All @@ -923,8 +932,9 @@ func (srv *Server) listenLoop() {
}
go func() {
defer debug.LogPanic()
srv.SetupConn(fd, inboundConn, nil)
slots <- struct{}{}
defer slots.Release(1)
// The error is logged in Server.setupConn().
_ = srv.SetupConn(fd, inboundConn, nil)
}()
}
}
Expand Down
89 changes: 48 additions & 41 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (c *testTransport) close(err error) {

func startTestServer(t *testing.T, remoteKey *ecdsa.PublicKey, pf func(*Peer)) *Server {
config := Config{
Name: "test",
MaxPeers: 10,
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Log: testlog.Logger(t, log.LvlError),
Name: "test",
MaxPeers: 10,
MaxPendingPeers: 10,
ListenAddr: "127.0.0.1:0",
NoDiscovery: true,
PrivateKey: newkey(),
Log: testlog.Logger(t, log.LvlError),
}
server := &Server{
Config: config,
Expand Down Expand Up @@ -211,18 +212,20 @@ func TestServerDial(t *testing.T) {
// This test checks that RemovePeer disconnects the peer if it is connected.
func TestServerRemovePeerDisconnect(t *testing.T) {
srv1 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
Log: testlog.Logger(t, log.LvlTrace).New("server", "1"),
PrivateKey: newkey(),
MaxPeers: 1,
MaxPendingPeers: 1,
NoDiscovery: true,
Log: testlog.Logger(t, log.LvlTrace).New("server", "1"),
}}
srv2 := &Server{Config: Config{
PrivateKey: newkey(),
MaxPeers: 1,
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Log: testlog.Logger(t, log.LvlTrace).New("server", "2"),
PrivateKey: newkey(),
MaxPeers: 1,
MaxPendingPeers: 1,
NoDiscovery: true,
NoDial: true,
ListenAddr: "127.0.0.1:0",
Log: testlog.Logger(t, log.LvlTrace).New("server", "2"),
}}
if err := srv1.TestStart(); err != nil {
t.Fatal("cant start srv1")
Expand All @@ -249,12 +252,13 @@ func TestServerAtCap(t *testing.T) {
trustedID := enode.PubkeyToIDV4(&trustedNode.PublicKey)
srv := &Server{
Config: Config{
PrivateKey: newkey(),
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
TrustedNodes: []*enode.Node{newNode(trustedID, "")},
Log: testlog.Logger(t, log.LvlTrace),
},
}
if err := srv.TestStart(); err != nil {
Expand Down Expand Up @@ -325,12 +329,13 @@ func TestServerPeerLimits(t *testing.T) {

srv := &Server{
Config: Config{
PrivateKey: srvkey,
MaxPeers: 0,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: srvkey,
MaxPeers: 0,
MaxPendingPeers: 50,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return tp },
}
Expand Down Expand Up @@ -432,12 +437,13 @@ func TestServerSetupConn(t *testing.T) {
for i, test := range tests {
t.Run(test.wantCalls, func(t *testing.T) {
cfg := Config{
PrivateKey: srvkey,
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: srvkey,
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
}
srv := &Server{
Config: cfg,
Expand Down Expand Up @@ -518,13 +524,14 @@ func TestServerInboundThrottle(t *testing.T) {
newTransportCalled := make(chan struct{})
srv := &Server{
Config: Config{
PrivateKey: newkey(),
ListenAddr: "127.0.0.1:0",
MaxPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
PrivateKey: newkey(),
ListenAddr: "127.0.0.1:0",
MaxPeers: 10,
MaxPendingPeers: 10,
NoDial: true,
NoDiscovery: true,
Protocols: []Protocol{discard},
Log: testlog.Logger(t, log.LvlTrace),
},
newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport {
newTransportCalled <- struct{}{}
Expand Down
1 change: 1 addition & 0 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
P2P: p2p.Config{
PrivateKey: config.PrivateKey,
MaxPeers: math.MaxInt32,
MaxPendingPeers: 50,
NoDiscovery: true,
Dialer: s,
EnableMsgEvents: config.EnableMsgEvents,
Expand Down

0 comments on commit ea6074f

Please sign in to comment.