From 6b0ad051c70975048bfb8a0949e87c07436ce947 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 6 May 2020 19:33:33 +0800 Subject: [PATCH 1/3] les: simplify prenegfilter --- les/lespay/client/prenegfilter.go | 73 +++++++++----------------- les/lespay/client/prenegfilter_test.go | 24 +++++---- les/serverpool.go | 6 +-- 3 files changed, 41 insertions(+), 62 deletions(-) diff --git a/les/lespay/client/prenegfilter.go b/les/lespay/client/prenegfilter.go index 0845a8907cdc..6ac44ef56486 100644 --- a/les/lespay/client/prenegfilter.go +++ b/les/lespay/client/prenegfilter.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/nodestate" ) @@ -35,17 +34,14 @@ type PreNegFilter struct { queryTimeout, canDialTimeout time.Duration input, canDialIter enode.Iterator query PreNegQuery - pending map[*enode.Node]func() - pendingQueries, needQueries int + pending map[*enode.Node]struct{} + needQueries int maxPendingQueries, canDialCount int waitingForNext, closed bool - testClock *mclock.Simulated } // PreNegQuery callback performs connection pre-negotiation. -// Note: the result callback function should always be called, if it has not been called -// before then cancel should call it. -type PreNegQuery func(n *enode.Node, result func(canDial bool)) (start, cancel func()) +type PreNegQuery func(n *enode.Node, result func(canDial bool)) func() // NewPreNegFilter creates a new PreNegFilter. sfQueried is set for each queried node, sfCanDial // is set together with sfQueried being reset if the callback returned a positive answer. The output @@ -55,9 +51,7 @@ type PreNegQuery func(n *enode.Node, result func(canDial bool)) (start, cancel f // with an active sfCanDial flag and the output iterator is already being read. Note that until // sfCanDial is reset or times out the filter won't start more queries even if the dial candidate // has been returned by the output iterator. -// If a simulated clock is used for testing then it should be provided in order to advance -// clock when waiting for query results. -func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query PreNegQuery, sfQueried, sfCanDial nodestate.Flags, maxPendingQueries int, queryTimeout, canDialTimeout time.Duration, testClock *mclock.Simulated) *PreNegFilter { +func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query PreNegQuery, sfQueried, sfCanDial nodestate.Flags, maxPendingQueries int, queryTimeout, canDialTimeout time.Duration) *PreNegFilter { pf := &PreNegFilter{ ns: ns, input: input, @@ -68,39 +62,38 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query canDialTimeout: canDialTimeout, maxPendingQueries: maxPendingQueries, canDialIter: NewQueueIterator(ns, sfCanDial, nodestate.Flags{}, false), - pending: make(map[*enode.Node]func()), - testClock: testClock, + pending: make(map[*enode.Node]struct{}), } pf.cond = sync.NewCond(&pf.lock) ns.SubscribeState(sfQueried.Or(sfCanDial), func(n *enode.Node, oldState, newState nodestate.Flags) { - var cancel func() pf.lock.Lock() + defer pf.lock.Unlock() + + // Maintain the 'canDial' result counter if oldState.HasAll(sfCanDial) { pf.canDialCount-- } if newState.HasAll(sfCanDial) { pf.canDialCount++ } - pf.checkQuery() + // Query timeout, remove it from the pending set and spin up one more query. if oldState.HasAll(sfQueried) && newState.HasNone(sfQueried.Or(sfCanDial)) { - // query timeout, call cancel function (the query result callback will remove it from the map) - cancel = pf.pending[n] - } - pf.lock.Unlock() - if cancel != nil { - cancel() + if _, exist := pf.pending[n]; exist { + delete(pf.pending, n) + pf.checkQuery() + } } }) go pf.readLoop() return pf } -// checkQuery checks whether we need more queries and signals readLoop if necessary +// checkQuery checks whether we need more queries and signals readLoop if necessary. func (pf *PreNegFilter) checkQuery() { if pf.waitingForNext && pf.canDialCount == 0 { pf.needQueries = pf.maxPendingQueries } - if pf.needQueries > pf.pendingQueries { + if pf.needQueries > len(pf.pending) { pf.cond.Signal() } } @@ -109,21 +102,9 @@ func (pf *PreNegFilter) checkQuery() { func (pf *PreNegFilter) readLoop() { for { pf.lock.Lock() - if pf.testClock != nil { - for pf.pendingQueries == pf.maxPendingQueries { - // advance simulated clock until our queries are finished or timed out - pf.lock.Unlock() - pf.testClock.Run(time.Second) - pf.lock.Lock() - if pf.closed { - pf.lock.Unlock() - return - } - } - } - for pf.needQueries <= pf.pendingQueries { - // either no queries are needed or we have enough pending; wait until more - // are needed + for pf.needQueries <= len(pf.pending) { + // either no queries are needed or we have enough pending; + // wait until more are needed pf.cond.Wait() if pf.closed { pf.lock.Unlock() @@ -131,6 +112,7 @@ func (pf *PreNegFilter) readLoop() { } } pf.lock.Unlock() + // fetch a node from the input that is not pending at the moment var node *enode.Node for { @@ -139,6 +121,7 @@ func (pf *PreNegFilter) readLoop() { return } node = pf.input.Node() + pf.lock.Lock() _, pending := pf.pending[node] pf.lock.Unlock() @@ -148,27 +131,24 @@ func (pf *PreNegFilter) readLoop() { } // set sfQueried and start the query pf.ns.SetState(node, pf.sfQueried, nodestate.Flags{}, pf.queryTimeout) - start, cancel := pf.query(node, func(canDial bool) { + start := pf.query(node, func(canDial bool) { if canDial { pf.lock.Lock() - pf.needQueries = 0 - pf.pendingQueries-- delete(pf.pending, node) + pf.needQueries = 0 pf.lock.Unlock() pf.ns.SetState(node, pf.sfCanDial, pf.sfQueried, pf.canDialTimeout) } else { - pf.ns.SetState(node, nodestate.Flags{}, pf.sfQueried, 0) pf.lock.Lock() - pf.pendingQueries-- delete(pf.pending, node) pf.checkQuery() pf.lock.Unlock() + pf.ns.SetState(node, nodestate.Flags{}, pf.sfQueried, 0) } }) // add pending entry before actually starting pf.lock.Lock() - pf.pendingQueries++ - pf.pending[node] = cancel + pf.pending[node] = struct{}{} pf.lock.Unlock() start() } @@ -177,11 +157,10 @@ func (pf *PreNegFilter) readLoop() { // Next moves to the next selectable node. func (pf *PreNegFilter) Next() bool { pf.lock.Lock() - pf.waitingForNext = true - // start queries if we cannot give a result immediately + pf.waitingForNext = true // start queries if we cannot give a result immediately pf.checkQuery() pf.lock.Unlock() - // get a result from the LIFO queue that returns nodes with active sfCanDial + next := pf.canDialIter.Next() pf.lock.Lock() pf.needQueries = 0 diff --git a/les/lespay/client/prenegfilter_test.go b/les/lespay/client/prenegfilter_test.go index c653010735e8..4ed43b210e02 100644 --- a/les/lespay/client/prenegfilter_test.go +++ b/les/lespay/client/prenegfilter_test.go @@ -18,6 +18,7 @@ package client import ( "math/rand" + "sync/atomic" "testing" "time" @@ -61,32 +62,35 @@ func TestPreNegFilter(t *testing.T) { } } - testQuery := func(node *enode.Node, result func(canDial bool)) (start, cancel func()) { + var timeout uint32 + testQuery := func(node *enode.Node, result func(canDial bool)) func() { idx := testNodeIndex(node.ID()) switch queryResult[idx] { case 0: - return func() { result(false) }, func() {} // negative answer + return func() { result(false) } // negative answer case 1: - return func() { result(true) }, func() {} // positive answer + return func() { result(true) } // positive answer case 2: - return func() {}, func() { result(false) } // timeout + return func() { + clock.Run(5 * time.Second) + atomic.AddUint32(&timeout, 1) + } // timeout } - return nil, nil + return nil } - pf := NewPreNegFilter(ns, enode.IterNodes(nodes), testQuery, sfTest1, sfTest2, 5, time.Second*5, time.Second*10, clock) + pf := NewPreNegFilter(ns, enode.IterNodes(nodes), testQuery, sfTest1, sfTest2, 5, time.Second*5, time.Second*10) pfr := enode.Filter(pf, func(node *enode.Node) bool { // remove canDial flag so that filter can keep querying ns.SetState(node, nodestate.Flags{}, sfTest2, 0) return true }) ns.Start() + l := len(enode.ReadNodes(pfr, pfTestTotal)) if l != pfTestPositive { t.Errorf("Wrong number of returned result (got %d, expected %d)", l, pfTestPositive) } - dt := time.Duration(clock.Now()) - expdt := time.Second * pfTestTimeout // 5 secs per timeout, 5 waiting in parallel - if dt != expdt { - t.Errorf("Wrong amount of time required (got %v, expected %v)", dt, expdt) + if atomic.LoadUint32(&timeout) != pfTestTimeout { + t.Errorf("Wrong amount of timeout query number(got %v, expected %v)", timeout, pfTestTimeout) } } diff --git a/les/serverpool.go b/les/serverpool.go index 1a9ec006ec98..ae57c0711cea 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -157,11 +157,7 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d iter := enode.Iterator(s.mixer) if query != nil { - var testClock *mclock.Simulated - if testing { - testClock = mono.(*mclock.Simulated) - } - iter = lpc.NewPreNegFilter(s.ns, iter, query, sfQueried, sfCanDial, 5, time.Second*5, time.Second*10, testClock) + iter = lpc.NewPreNegFilter(s.ns, iter, query, sfQueried, sfCanDial, 5, time.Second*5, time.Second*10) } s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool { s.ns.SetState(node, sfDialed, sfCanDial, time.Second*10) From 8941a890fedb6d6eb4fbe90987140df346461dee Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 11 May 2020 12:58:36 +0800 Subject: [PATCH 2/3] les/lespay/client: maintain the waiting list --- les/lespay/client/prenegfilter.go | 38 +++++++++++++++++-------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/les/lespay/client/prenegfilter.go b/les/lespay/client/prenegfilter.go index 6ac44ef56486..18fba72265e6 100644 --- a/les/lespay/client/prenegfilter.go +++ b/les/lespay/client/prenegfilter.go @@ -27,17 +27,18 @@ import ( // PreNegFilter is a filter on an enode.Iterator that performs connection pre-negotiation // using the provided callback and only returns nodes that gave a positive answer recently. type PreNegFilter struct { - lock sync.Mutex - cond *sync.Cond - ns *nodestate.NodeStateMachine - sfQueried, sfCanDial nodestate.Flags - queryTimeout, canDialTimeout time.Duration - input, canDialIter enode.Iterator - query PreNegQuery - pending map[*enode.Node]struct{} - needQueries int - maxPendingQueries, canDialCount int - waitingForNext, closed bool + lock sync.Mutex + cond *sync.Cond + ns *nodestate.NodeStateMachine + sfQueried, sfCanDial nodestate.Flags + queryTimeout, canDialTimeout time.Duration + input, canDialIter enode.Iterator + query PreNegQuery + pending map[*enode.Node]struct{} + waiting map[*enode.Node]struct{} + needQueries int + maxPendingQueries int + waitingForNext, closed bool } // PreNegQuery callback performs connection pre-negotiation. @@ -59,22 +60,21 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query sfQueried: sfQueried, sfCanDial: sfCanDial, queryTimeout: queryTimeout, - canDialTimeout: canDialTimeout, maxPendingQueries: maxPendingQueries, canDialIter: NewQueueIterator(ns, sfCanDial, nodestate.Flags{}, false), pending: make(map[*enode.Node]struct{}), + waiting: make(map[*enode.Node]struct{}), } pf.cond = sync.NewCond(&pf.lock) ns.SubscribeState(sfQueried.Or(sfCanDial), func(n *enode.Node, oldState, newState nodestate.Flags) { pf.lock.Lock() defer pf.lock.Unlock() - // Maintain the 'canDial' result counter if oldState.HasAll(sfCanDial) { - pf.canDialCount-- + delete(pf.waiting, n) } if newState.HasAll(sfCanDial) { - pf.canDialCount++ + pf.waiting[n] = struct{}{} } // Query timeout, remove it from the pending set and spin up one more query. if oldState.HasAll(sfQueried) && newState.HasNone(sfQueried.Or(sfCanDial)) { @@ -90,11 +90,14 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query // checkQuery checks whether we need more queries and signals readLoop if necessary. func (pf *PreNegFilter) checkQuery() { - if pf.waitingForNext && pf.canDialCount == 0 { + if pf.waitingForNext && len(pf.waiting) == 0 { pf.needQueries = pf.maxPendingQueries } if pf.needQueries > len(pf.pending) { - pf.cond.Signal() + diff := pf.needQueries - len(pf.pending) + for i := 0; i < diff; i++ { + pf.cond.Signal() + } } } @@ -165,6 +168,7 @@ func (pf *PreNegFilter) Next() bool { pf.lock.Lock() pf.needQueries = 0 pf.waitingForNext = false + delete(pf.waiting, pf.Node()) pf.lock.Unlock() return next } From 0ed3a757896451489ab0bb7ba877cb9b1a0a1bfe Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 12 May 2020 14:40:19 +0800 Subject: [PATCH 3/3] les/lespay: add back cancel --- les/lespay/client/prenegfilter.go | 26 ++++++++++++++++---------- les/lespay/client/prenegfilter_test.go | 10 +++++----- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/les/lespay/client/prenegfilter.go b/les/lespay/client/prenegfilter.go index 18fba72265e6..903e458b717d 100644 --- a/les/lespay/client/prenegfilter.go +++ b/les/lespay/client/prenegfilter.go @@ -34,15 +34,16 @@ type PreNegFilter struct { queryTimeout, canDialTimeout time.Duration input, canDialIter enode.Iterator query PreNegQuery - pending map[*enode.Node]struct{} + pending map[*enode.Node]func() waiting map[*enode.Node]struct{} needQueries int maxPendingQueries int waitingForNext, closed bool } -// PreNegQuery callback performs connection pre-negotiation. -type PreNegQuery func(n *enode.Node, result func(canDial bool)) func() +// PreNegQuery callback performs connection pre-negotiation. It will return +// two functions: _start_ and _cancel_ which can be used to control query. +type PreNegQuery func(n *enode.Node, result func(canDial bool)) (func(), func()) // NewPreNegFilter creates a new PreNegFilter. sfQueried is set for each queried node, sfCanDial // is set together with sfQueried being reset if the callback returned a positive answer. The output @@ -62,14 +63,12 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query queryTimeout: queryTimeout, maxPendingQueries: maxPendingQueries, canDialIter: NewQueueIterator(ns, sfCanDial, nodestate.Flags{}, false), - pending: make(map[*enode.Node]struct{}), + pending: make(map[*enode.Node]func()), waiting: make(map[*enode.Node]struct{}), } pf.cond = sync.NewCond(&pf.lock) ns.SubscribeState(sfQueried.Or(sfCanDial), func(n *enode.Node, oldState, newState nodestate.Flags) { pf.lock.Lock() - defer pf.lock.Unlock() - if oldState.HasAll(sfCanDial) { delete(pf.waiting, n) } @@ -77,12 +76,19 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query pf.waiting[n] = struct{}{} } // Query timeout, remove it from the pending set and spin up one more query. + // If the cancel function is specified, run it without holding the lock. + var cancel func() if oldState.HasAll(sfQueried) && newState.HasNone(sfQueried.Or(sfCanDial)) { - if _, exist := pf.pending[n]; exist { + if fn, exist := pf.pending[n]; exist { + cancel = fn delete(pf.pending, n) pf.checkQuery() } } + pf.lock.Unlock() + if cancel != nil { + cancel() + } }) go pf.readLoop() return pf @@ -134,7 +140,7 @@ func (pf *PreNegFilter) readLoop() { } // set sfQueried and start the query pf.ns.SetState(node, pf.sfQueried, nodestate.Flags{}, pf.queryTimeout) - start := pf.query(node, func(canDial bool) { + start, cancel := pf.query(node, func(canDial bool) { if canDial { pf.lock.Lock() delete(pf.pending, node) @@ -151,7 +157,7 @@ func (pf *PreNegFilter) readLoop() { }) // add pending entry before actually starting pf.lock.Lock() - pf.pending[node] = struct{}{} + pf.pending[node] = cancel pf.lock.Unlock() start() } @@ -186,4 +192,4 @@ func (pf *PreNegFilter) Close() { // Node returns the current node. func (pf *PreNegFilter) Node() *enode.Node { return pf.canDialIter.Node() -} +} \ No newline at end of file diff --git a/les/lespay/client/prenegfilter_test.go b/les/lespay/client/prenegfilter_test.go index 4ed43b210e02..4be17cbbb2ec 100644 --- a/les/lespay/client/prenegfilter_test.go +++ b/les/lespay/client/prenegfilter_test.go @@ -63,20 +63,20 @@ func TestPreNegFilter(t *testing.T) { } var timeout uint32 - testQuery := func(node *enode.Node, result func(canDial bool)) func() { + testQuery := func(node *enode.Node, result func(canDial bool)) (func(), func()) { idx := testNodeIndex(node.ID()) switch queryResult[idx] { case 0: - return func() { result(false) } // negative answer + return func() { result(false) }, func() {} // negative answer case 1: - return func() { result(true) } // positive answer + return func() { result(true) }, func() {} // positive answer case 2: return func() { clock.Run(5 * time.Second) atomic.AddUint32(&timeout, 1) - } // timeout + }, func() {} // timeout } - return nil + return nil, nil } pf := NewPreNegFilter(ns, enode.IterNodes(nodes), testQuery, sfTest1, sfTest2, 5, time.Second*5, time.Second*10) pfr := enode.Filter(pf, func(node *enode.Node) bool {