From b434d767e20fc9561f0dc5173713ee670ad64efd Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Thu, 14 Jun 2018 17:46:26 +0800 Subject: [PATCH 1/4] les: handle conn/disc/reg logic in the eventloop --- les/serverpool.go | 193 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 135 insertions(+), 58 deletions(-) diff --git a/les/serverpool.go b/les/serverpool.go index a39f8835542e..01665512d674 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -87,6 +87,26 @@ const ( initStatsWeight = 1 ) +// connReq represents a request for peer connection. +type connReq struct { + p *peer + ip net.IP + port uint16 + cont chan *poolEntry +} + +// discReq represents a request for peer disconnection. +type discReq struct { + entry *poolEntry + cont chan struct{} +} + +// registerReq represents a request for peer registration. +type registerReq struct { + entry *poolEntry + cont chan struct{} +} + // serverPool implements a pool for storing and selecting newly discovered and already // known light server nodes. It received discovered nodes, stores statistics about // known nodes and takes care of always having enough good quality servers connected. @@ -109,6 +129,10 @@ type serverPool struct { timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust + connCh chan *connReq + discCh chan *discReq + registerCh chan *registerReq + knownQueue, newQueue poolEntryQueue knownSelect, newSelect *weightedRandomSelect knownSelected, newSelected int @@ -125,6 +149,9 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s timeout: make(chan *poolEntry, 1), adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), + connCh: make(chan *connReq), + discCh: make(chan *discReq), + registerCh: make(chan *registerReq), knownSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(), fastDiscover: true, @@ -158,46 +185,27 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { - pool.lock.Lock() - defer pool.lock.Unlock() - entry := pool.entries[p.ID()] - if entry == nil { - entry = pool.findOrNewNode(p.ID(), ip, port) - } - p.Log().Debug("Connecting to new peer", "state", entry.state) - if entry.state == psConnected || entry.state == psRegistered { + log.Debug("Connect new entry", "enode", p.id) + req := &connReq{p: p, ip: ip, port: port, cont: make(chan *poolEntry, 1)} + select { + case pool.connCh <- req: + case <-pool.quit: return nil } - pool.connWg.Add(1) - entry.peer = p - entry.state = psConnected - addr := &poolEntryAddress{ - ip: ip, - port: port, - lastSeen: mclock.Now(), - } - entry.lastConnected = addr - entry.addr = make(map[string]*poolEntryAddress) - entry.addr[addr.strKey()] = addr - entry.addrSelect = *newWeightedRandomSelect() - entry.addrSelect.update(addr) - return entry + return <-req.cont } // registered should be called after a successful handshake func (pool *serverPool) registered(entry *poolEntry) { log.Debug("Registered new entry", "enode", entry.id) - pool.lock.Lock() - defer pool.lock.Unlock() - - entry.state = psRegistered - entry.regTime = mclock.Now() - if !entry.known { - pool.newQueue.remove(entry) - entry.known = true + req := ®isterReq{entry: entry, cont: make(chan struct{})} + select { + case pool.registerCh <- req: + case <-pool.quit: + return } - pool.knownQueue.setLatest(entry) - entry.shortRetry = shortRetryCnt + <-req.cont + return } // disconnect should be called when ending a connection. Service quality statistics @@ -205,36 +213,45 @@ func (pool *serverPool) registered(entry *poolEntry) { // only connection statistics are updated, just like in case of timeout) func (pool *serverPool) disconnect(entry *poolEntry) { log.Debug("Disconnected old entry", "enode", entry.id) - pool.lock.Lock() - defer pool.lock.Unlock() - - if entry.state == psRegistered { - connTime := mclock.Now() - entry.regTime - connAdjust := float64(connTime) / float64(targetConnTime) - if connAdjust > 1 { - connAdjust = 1 - } - stopped := false - select { - case <-pool.quit: - stopped = true - default: - } - if stopped { + stopped := false + select { + case <-pool.quit: + stopped = true + default: + } + + if stopped { + // Request is emitted by ourselves, handle the logic here since eventloop doesn't + // serve requests anymore. + pool.lock.Lock() + defer pool.lock.Unlock() + + if entry.state == psRegistered { + connTime := mclock.Now() - entry.regTime + connAdjust := float64(connTime) / float64(targetConnTime) + if connAdjust > 1 { + connAdjust = 1 + } entry.connectStats.add(1, connAdjust) + } + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- } else { - entry.connectStats.add(connAdjust, 1) + pool.newSelected-- } - } - - entry.state = psNotConnected - if entry.knownSelected { - pool.knownSelected-- + pool.setRetryDial(entry) + pool.connWg.Done() } else { - pool.newSelected-- + // Request is emitted by the server side + req := &discReq{entry: entry, cont: make(chan struct{})} + select { + case pool.discCh <- req: + case <-pool.quit: + return + } + <-req.cont } - pool.setRetryDial(entry) - pool.connWg.Done() } const ( @@ -327,6 +344,67 @@ func (pool *serverPool) eventLoop() { } } + case req := <-pool.connCh: + // Handle peer connection requests. + entry := pool.entries[req.p.ID()] + if entry == nil { + entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port) + } + req.p.Log().Debug("Connecting to new peer", "state", entry.state) + if entry.state == psConnected || entry.state == psRegistered { + req.cont <- nil + continue + } + pool.connWg.Add(1) + entry.peer = req.p + entry.state = psConnected + addr := &poolEntryAddress{ + ip: req.ip, + port: req.port, + lastSeen: mclock.Now(), + } + entry.lastConnected = addr + entry.addr = make(map[string]*poolEntryAddress) + entry.addr[addr.strKey()] = addr + entry.addrSelect = *newWeightedRandomSelect() + entry.addrSelect.update(addr) + req.cont <- entry + + case req := <-pool.registerCh: + // Handle peer registration requests. + entry := req.entry + entry.state = psRegistered + entry.regTime = mclock.Now() + if !entry.known { + pool.newQueue.remove(entry) + entry.known = true + } + pool.knownQueue.setLatest(entry) + entry.shortRetry = shortRetryCnt + close(req.cont) + + case req := <-pool.discCh: + // Handle peer disconnection requests. + entry := req.entry + if entry.state == psRegistered { + connTime := mclock.Now() - entry.regTime + connAdjust := float64(connTime) / float64(targetConnTime) + if connAdjust > 1 { + connAdjust = 1 + } + entry.connectStats.add(connAdjust, 1) + } + + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + pool.setRetryDial(entry) + pool.connWg.Done() + close(req.cont) + case <-pool.quit: if pool.discSetPeriod != nil { close(pool.discSetPeriod) @@ -335,7 +413,6 @@ func (pool *serverPool) eventLoop() { pool.saveNodes() pool.wg.Done() return - } } } From 3f8b3a58e39a7db9725217eda1c7aa8edbf8da4f Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 19 Jun 2018 14:28:19 +0800 Subject: [PATCH 2/4] les: try to dial before start eventloop --- les/serverpool.go | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/les/serverpool.go b/les/serverpool.go index 01665512d674..e4a367852af8 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -125,7 +125,6 @@ type serverPool struct { discLookups chan bool entries map[discover.NodeID]*poolEntry - lock sync.Mutex timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust @@ -174,9 +173,8 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { pool.discLookups = make(chan bool, 100) go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) } - - go pool.eventLoop() pool.checkDial() + go pool.eventLoop() } // connect should be called upon any incoming connection. If the connection has been @@ -223,24 +221,14 @@ func (pool *serverPool) disconnect(entry *poolEntry) { if stopped { // Request is emitted by ourselves, handle the logic here since eventloop doesn't // serve requests anymore. - pool.lock.Lock() - defer pool.lock.Unlock() - if entry.state == psRegistered { - connTime := mclock.Now() - entry.regTime - connAdjust := float64(connTime) / float64(targetConnTime) + connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) if connAdjust > 1 { connAdjust = 1 } entry.connectStats.add(1, connAdjust) } entry.state = psNotConnected - if entry.knownSelected { - pool.knownSelected-- - } else { - pool.newSelected-- - } - pool.setRetryDial(entry) pool.connWg.Done() } else { // Request is emitted by the server side @@ -297,22 +285,17 @@ func (pool *serverPool) eventLoop() { for { select { case entry := <-pool.timeout: - pool.lock.Lock() if !entry.removed { pool.checkDialTimeout(entry) } - pool.lock.Unlock() case entry := <-pool.enableRetry: - pool.lock.Lock() if !entry.removed { entry.delayedRetry = false pool.updateCheckDial(entry) } - pool.lock.Unlock() case adj := <-pool.adjustStats: - pool.lock.Lock() switch adj.adjustType { case pseBlockDelay: adj.entry.delayStats.add(float64(adj.time), 1) @@ -322,13 +305,10 @@ func (pool *serverPool) eventLoop() { case pseResponseTimeout: adj.entry.timeoutStats.add(1, 1) } - pool.lock.Unlock() case node := <-pool.discNodes: - pool.lock.Lock() entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP) pool.updateCheckDial(entry) - pool.lock.Unlock() case conv := <-pool.discLookups: if conv { @@ -350,7 +330,6 @@ func (pool *serverPool) eventLoop() { if entry == nil { entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port) } - req.p.Log().Debug("Connecting to new peer", "state", entry.state) if entry.state == psConnected || entry.state == psRegistered { req.cont <- nil continue @@ -387,15 +366,14 @@ func (pool *serverPool) eventLoop() { // Handle peer disconnection requests. entry := req.entry if entry.state == psRegistered { - connTime := mclock.Now() - entry.regTime - connAdjust := float64(connTime) / float64(targetConnTime) + connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) if connAdjust > 1 { connAdjust = 1 } entry.connectStats.add(connAdjust, 1) } - entry.state = psNotConnected + if entry.knownSelected { pool.knownSelected-- } else { From 5cf0cfd97196c858ad2f4c0b8b03a7f57fef2030 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 19 Jun 2018 22:30:36 +0800 Subject: [PATCH 3/4] les: handle disconnect logic more safely --- les/serverpool.go | 130 ++++++++++++++++++++++++---------------------- 1 file changed, 68 insertions(+), 62 deletions(-) diff --git a/les/serverpool.go b/les/serverpool.go index e4a367852af8..d490de9e49d8 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -89,22 +89,23 @@ const ( // connReq represents a request for peer connection. type connReq struct { - p *peer - ip net.IP - port uint16 - cont chan *poolEntry + p *peer + ip net.IP + port uint16 + result chan *poolEntry } -// discReq represents a request for peer disconnection. -type discReq struct { - entry *poolEntry - cont chan struct{} +// disconnReq represents a request for peer disconnection. +type disconnReq struct { + entry *poolEntry + stopped bool + done chan struct{} } // registerReq represents a request for peer registration. type registerReq struct { entry *poolEntry - cont chan struct{} + done chan struct{} } // serverPool implements a pool for storing and selecting newly discovered and already @@ -129,7 +130,7 @@ type serverPool struct { adjustStats chan poolStatAdjust connCh chan *connReq - discCh chan *discReq + disconnCh chan *disconnReq registerCh chan *registerReq knownQueue, newQueue poolEntryQueue @@ -149,7 +150,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), connCh: make(chan *connReq), - discCh: make(chan *discReq), + disconnCh: make(chan *disconnReq), registerCh: make(chan *registerReq), knownSelect: newWeightedRandomSelect(), newSelect: newWeightedRandomSelect(), @@ -184,62 +185,43 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { // disconnect should also always be called. func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { log.Debug("Connect new entry", "enode", p.id) - req := &connReq{p: p, ip: ip, port: port, cont: make(chan *poolEntry, 1)} + req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)} select { case pool.connCh <- req: case <-pool.quit: return nil } - return <-req.cont + return <-req.result } // registered should be called after a successful handshake func (pool *serverPool) registered(entry *poolEntry) { log.Debug("Registered new entry", "enode", entry.id) - req := ®isterReq{entry: entry, cont: make(chan struct{})} + req := ®isterReq{entry: entry, done: make(chan struct{})} select { case pool.registerCh <- req: case <-pool.quit: return } - <-req.cont - return + <-req.done } // disconnect should be called when ending a connection. Service quality statistics // can be updated optionally (not updated if no registration happened, in this case // only connection statistics are updated, just like in case of timeout) func (pool *serverPool) disconnect(entry *poolEntry) { - log.Debug("Disconnected old entry", "enode", entry.id) stopped := false select { case <-pool.quit: stopped = true default: } + log.Debug("Disconnected old entry", "enode", entry.id) + req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})} - if stopped { - // Request is emitted by ourselves, handle the logic here since eventloop doesn't - // serve requests anymore. - if entry.state == psRegistered { - connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) - if connAdjust > 1 { - connAdjust = 1 - } - entry.connectStats.add(1, connAdjust) - } - entry.state = psNotConnected - pool.connWg.Done() - } else { - // Request is emitted by the server side - req := &discReq{entry: entry, cont: make(chan struct{})} - select { - case pool.discCh <- req: - case <-pool.quit: - return - } - <-req.cont - } + // Block until disconnection request be served. + pool.disconnCh <- req + <-req.done } const ( @@ -282,6 +264,37 @@ func (pool *serverPool) eventLoop() { if pool.discSetPeriod != nil { pool.discSetPeriod <- time.Millisecond * 100 } + + // disconnect updates service quality statistics depending on the connection time + // and disconnection initiator. + disconnect := func(req *disconnReq, stopped bool) { + // Handle peer disconnection requests. + entry := req.entry + if entry.state == psRegistered { + connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) + if connAdjust > 1 { + connAdjust = 1 + } + if stopped { + // disconnect requested by ourselves. + entry.connectStats.add(1, connAdjust) + } else { + // disconnect requested by server side. + entry.connectStats.add(connAdjust, 1) + } + } + entry.state = psNotConnected + + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + pool.setRetryDial(entry) + pool.connWg.Done() + close(req.done) + } + for { select { case entry := <-pool.timeout: @@ -331,7 +344,7 @@ func (pool *serverPool) eventLoop() { entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port) } if entry.state == psConnected || entry.state == psRegistered { - req.cont <- nil + req.result <- nil continue } pool.connWg.Add(1) @@ -347,7 +360,7 @@ func (pool *serverPool) eventLoop() { entry.addr[addr.strKey()] = addr entry.addrSelect = *newWeightedRandomSelect() entry.addrSelect.update(addr) - req.cont <- entry + req.result <- entry case req := <-pool.registerCh: // Handle peer registration requests. @@ -360,34 +373,27 @@ func (pool *serverPool) eventLoop() { } pool.knownQueue.setLatest(entry) entry.shortRetry = shortRetryCnt - close(req.cont) + close(req.done) - case req := <-pool.discCh: + case req := <-pool.disconnCh: // Handle peer disconnection requests. - entry := req.entry - if entry.state == psRegistered { - connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) - if connAdjust > 1 { - connAdjust = 1 - } - entry.connectStats.add(connAdjust, 1) - } - entry.state = psNotConnected - - if entry.knownSelected { - pool.knownSelected-- - } else { - pool.newSelected-- - } - pool.setRetryDial(entry) - pool.connWg.Done() - close(req.cont) + disconnect(req, req.stopped) case <-pool.quit: if pool.discSetPeriod != nil { close(pool.discSetPeriod) } - pool.connWg.Wait() + + // Spawn a goroutine to close the disconnCh until all connections are disconnected. + go func() { + pool.connWg.Wait() + close(pool.disconnCh) + }() + + // Handle all remain disconnection requests before exit. + for req := range pool.disconnCh { + disconnect(req, true) + } pool.saveNodes() pool.wg.Done() return From 094038bd3cde7031b22d890889cf694d915bde88 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 25 Jun 2018 16:19:24 +0800 Subject: [PATCH 4/4] les: grammar fix --- les/serverpool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/les/serverpool.go b/les/serverpool.go index d490de9e49d8..1a4c75229371 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -219,7 +219,7 @@ func (pool *serverPool) disconnect(entry *poolEntry) { log.Debug("Disconnected old entry", "enode", entry.id) req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})} - // Block until disconnection request be served. + // Block until disconnection request is served. pool.disconnCh <- req <-req.done } @@ -384,13 +384,13 @@ func (pool *serverPool) eventLoop() { close(pool.discSetPeriod) } - // Spawn a goroutine to close the disconnCh until all connections are disconnected. + // Spawn a goroutine to close the disconnCh after all connections are disconnected. go func() { pool.connWg.Wait() close(pool.disconnCh) }() - // Handle all remain disconnection requests before exit. + // Handle all remaining disconnection requests before exit. for req := range pool.disconnCh { disconnect(req, true) }