Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: handle conn/disc/reg logic in the eventloop #16981

Merged
merged 4 commits into from
Jun 25, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 135 additions & 74 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ const (
initStatsWeight = 1
)

// connReq represents a request for peer connection.
type connReq struct {
p *peer
ip net.IP
port uint16
result chan *poolEntry
}

// disconnReq represents a request for peer disconnection.
type disconnReq struct {
entry *poolEntry
stopped bool
done chan struct{}
}

// registerReq represents a request for peer registration.
type registerReq struct {
entry *poolEntry
done chan struct{}
}

// serverPool implements a pool for storing and selecting newly discovered and already
// known light server nodes. It received discovered nodes, stores statistics about
// known nodes and takes care of always having enough good quality servers connected.
Expand All @@ -105,10 +126,13 @@ type serverPool struct {
discLookups chan bool

entries map[discover.NodeID]*poolEntry
lock sync.Mutex
timeout, enableRetry chan *poolEntry
adjustStats chan poolStatAdjust

connCh chan *connReq
disconnCh chan *disconnReq
registerCh chan *registerReq

knownQueue, newQueue poolEntryQueue
knownSelect, newSelect *weightedRandomSelect
knownSelected, newSelected int
Expand All @@ -125,6 +149,9 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
timeout: make(chan *poolEntry, 1),
adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1),
connCh: make(chan *connReq),
disconnCh: make(chan *disconnReq),
registerCh: make(chan *registerReq),
knownSelect: newWeightedRandomSelect(),
newSelect: newWeightedRandomSelect(),
fastDiscover: true,
Expand All @@ -147,9 +174,8 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
pool.discLookups = make(chan bool, 100)
go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
}

go pool.eventLoop()
pool.checkDial()
go pool.eventLoop()
}

// connect should be called upon any incoming connection. If the connection has been
Expand All @@ -158,83 +184,44 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
pool.lock.Lock()
defer pool.lock.Unlock()
entry := pool.entries[p.ID()]
if entry == nil {
entry = pool.findOrNewNode(p.ID(), ip, port)
}
p.Log().Debug("Connecting to new peer", "state", entry.state)
if entry.state == psConnected || entry.state == psRegistered {
log.Debug("Connect new entry", "enode", p.id)
req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)}
select {
case pool.connCh <- req:
case <-pool.quit:
return nil
}
pool.connWg.Add(1)
entry.peer = p
entry.state = psConnected
addr := &poolEntryAddress{
ip: ip,
port: port,
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
return entry
return <-req.result
}

// registered should be called after a successful handshake
func (pool *serverPool) registered(entry *poolEntry) {
log.Debug("Registered new entry", "enode", entry.id)
pool.lock.Lock()
defer pool.lock.Unlock()

entry.state = psRegistered
entry.regTime = mclock.Now()
if !entry.known {
pool.newQueue.remove(entry)
entry.known = true
req := &registerReq{entry: entry, done: make(chan struct{})}
select {
case pool.registerCh <- req:
case <-pool.quit:
return
}
pool.knownQueue.setLatest(entry)
entry.shortRetry = shortRetryCnt
<-req.done
}

// disconnect should be called when ending a connection. Service quality statistics
// can be updated optionally (not updated if no registration happened, in this case
// only connection statistics are updated, just like in case of timeout)
func (pool *serverPool) disconnect(entry *poolEntry) {
log.Debug("Disconnected old entry", "enode", entry.id)
pool.lock.Lock()
defer pool.lock.Unlock()

if entry.state == psRegistered {
connTime := mclock.Now() - entry.regTime
connAdjust := float64(connTime) / float64(targetConnTime)
if connAdjust > 1 {
connAdjust = 1
}
stopped := false
select {
case <-pool.quit:
stopped = true
default:
}
if stopped {
entry.connectStats.add(1, connAdjust)
} else {
entry.connectStats.add(connAdjust, 1)
}
stopped := false
select {
case <-pool.quit:
stopped = true
default:
}
log.Debug("Disconnected old entry", "enode", entry.id)
req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}

entry.state = psNotConnected
if entry.knownSelected {
pool.knownSelected--
} else {
pool.newSelected--
}
pool.setRetryDial(entry)
pool.connWg.Done()
// Block until disconnection request is served.
pool.disconnCh <- req
<-req.done
}

const (
Expand Down Expand Up @@ -277,25 +264,51 @@ func (pool *serverPool) eventLoop() {
if pool.discSetPeriod != nil {
pool.discSetPeriod <- time.Millisecond * 100
}

// disconnect updates service quality statistics depending on the connection time
// and disconnection initiator.
disconnect := func(req *disconnReq, stopped bool) {
// Handle peer disconnection requests.
entry := req.entry
if entry.state == psRegistered {
connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime)
if connAdjust > 1 {
connAdjust = 1
}
if stopped {
// disconnect requested by ourselves.
entry.connectStats.add(1, connAdjust)
} else {
// disconnect requested by server side.
entry.connectStats.add(connAdjust, 1)
}
}
entry.state = psNotConnected

if entry.knownSelected {
pool.knownSelected--
} else {
pool.newSelected--
}
pool.setRetryDial(entry)
pool.connWg.Done()
close(req.done)
}

for {
select {
case entry := <-pool.timeout:
pool.lock.Lock()
if !entry.removed {
pool.checkDialTimeout(entry)
}
pool.lock.Unlock()

case entry := <-pool.enableRetry:
pool.lock.Lock()
if !entry.removed {
entry.delayedRetry = false
pool.updateCheckDial(entry)
}
pool.lock.Unlock()

case adj := <-pool.adjustStats:
pool.lock.Lock()
switch adj.adjustType {
case pseBlockDelay:
adj.entry.delayStats.add(float64(adj.time), 1)
Expand All @@ -305,13 +318,10 @@ func (pool *serverPool) eventLoop() {
case pseResponseTimeout:
adj.entry.timeoutStats.add(1, 1)
}
pool.lock.Unlock()

case node := <-pool.discNodes:
pool.lock.Lock()
entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
pool.updateCheckDial(entry)
pool.lock.Unlock()

case conv := <-pool.discLookups:
if conv {
Expand All @@ -327,15 +337,66 @@ func (pool *serverPool) eventLoop() {
}
}

case req := <-pool.connCh:
// Handle peer connection requests.
entry := pool.entries[req.p.ID()]
if entry == nil {
entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
}
if entry.state == psConnected || entry.state == psRegistered {
req.result <- nil
continue
}
pool.connWg.Add(1)
entry.peer = req.p
entry.state = psConnected
addr := &poolEntryAddress{
ip: req.ip,
port: req.port,
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
entry.addr = make(map[string]*poolEntryAddress)
entry.addr[addr.strKey()] = addr
entry.addrSelect = *newWeightedRandomSelect()
entry.addrSelect.update(addr)
req.result <- entry

case req := <-pool.registerCh:
// Handle peer registration requests.
entry := req.entry
entry.state = psRegistered
entry.regTime = mclock.Now()
if !entry.known {
pool.newQueue.remove(entry)
entry.known = true
}
pool.knownQueue.setLatest(entry)
entry.shortRetry = shortRetryCnt
close(req.done)

case req := <-pool.disconnCh:
// Handle peer disconnection requests.
disconnect(req, req.stopped)

case <-pool.quit:
if pool.discSetPeriod != nil {
close(pool.discSetPeriod)
}
pool.connWg.Wait()

// Spawn a goroutine to close the disconnCh after all connections are disconnected.
go func() {
pool.connWg.Wait()
close(pool.disconnCh)
}()

// Handle all remaining disconnection requests before exit.
for req := range pool.disconnCh {
disconnect(req, true)
}
pool.saveNodes()
pool.wg.Done()
return

}
}
}
Expand Down