Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: simplify prenegfilter #11

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 45 additions & 56 deletions les/lespay/client/prenegfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,30 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/nodestate"
)

// PreNegFilter is a filter on an enode.Iterator that performs connection pre-negotiation
// using the provided callback and only returns nodes that gave a positive answer recently.
type PreNegFilter struct {
lock sync.Mutex
cond *sync.Cond
ns *nodestate.NodeStateMachine
sfQueried, sfCanDial nodestate.Flags
queryTimeout, canDialTimeout time.Duration
input, canDialIter enode.Iterator
query PreNegQuery
pending map[*enode.Node]func()
pendingQueries, needQueries int
maxPendingQueries, canDialCount int
waitingForNext, closed bool
testClock *mclock.Simulated
lock sync.Mutex
cond *sync.Cond
ns *nodestate.NodeStateMachine
sfQueried, sfCanDial nodestate.Flags
queryTimeout, canDialTimeout time.Duration
input, canDialIter enode.Iterator
query PreNegQuery
pending map[*enode.Node]func()
waiting map[*enode.Node]struct{}
needQueries int
maxPendingQueries int
waitingForNext, closed bool
}

// PreNegQuery callback performs connection pre-negotiation.
// Note: the result callback function should always be called, if it has not been called
// before then cancel should call it.
type PreNegQuery func(n *enode.Node, result func(canDial bool)) (start, cancel func())
// PreNegQuery callback performs connection pre-negotiation. It will return
// two functions: _start_ and _cancel_ which can be used to control query.
type PreNegQuery func(n *enode.Node, result func(canDial bool)) (func(), func())

// NewPreNegFilter creates a new PreNegFilter. sfQueried is set for each queried node, sfCanDial
// is set together with sfQueried being reset if the callback returned a positive answer. The output
Expand All @@ -55,36 +53,37 @@ type PreNegQuery func(n *enode.Node, result func(canDial bool)) (start, cancel f
// with an active sfCanDial flag and the output iterator is already being read. Note that until
// sfCanDial is reset or times out the filter won't start more queries even if the dial candidate
// has been returned by the output iterator.
// If a simulated clock is used for testing then it should be provided in order to advance
// clock when waiting for query results.
func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query PreNegQuery, sfQueried, sfCanDial nodestate.Flags, maxPendingQueries int, queryTimeout, canDialTimeout time.Duration, testClock *mclock.Simulated) *PreNegFilter {
func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query PreNegQuery, sfQueried, sfCanDial nodestate.Flags, maxPendingQueries int, queryTimeout, canDialTimeout time.Duration) *PreNegFilter {
pf := &PreNegFilter{
ns: ns,
input: input,
query: query,
sfQueried: sfQueried,
sfCanDial: sfCanDial,
queryTimeout: queryTimeout,
canDialTimeout: canDialTimeout,
maxPendingQueries: maxPendingQueries,
canDialIter: NewQueueIterator(ns, sfCanDial, nodestate.Flags{}, false),
pending: make(map[*enode.Node]func()),
testClock: testClock,
waiting: make(map[*enode.Node]struct{}),
}
pf.cond = sync.NewCond(&pf.lock)
ns.SubscribeState(sfQueried.Or(sfCanDial), func(n *enode.Node, oldState, newState nodestate.Flags) {
var cancel func()
pf.lock.Lock()
if oldState.HasAll(sfCanDial) {
pf.canDialCount--
delete(pf.waiting, n)
}
if newState.HasAll(sfCanDial) {
pf.canDialCount++
pf.waiting[n] = struct{}{}
}
pf.checkQuery()
// Query timeout, remove it from the pending set and spin up one more query.
// If the cancel function is specified, run it without holding the lock.
var cancel func()
if oldState.HasAll(sfQueried) && newState.HasNone(sfQueried.Or(sfCanDial)) {
// query timeout, call cancel function (the query result callback will remove it from the map)
cancel = pf.pending[n]
if fn, exist := pf.pending[n]; exist {
cancel = fn
delete(pf.pending, n)
pf.checkQuery()
}
}
pf.lock.Unlock()
if cancel != nil {
Expand All @@ -95,42 +94,34 @@ func NewPreNegFilter(ns *nodestate.NodeStateMachine, input enode.Iterator, query
return pf
}

// checkQuery checks whether we need more queries and signals readLoop if necessary
// checkQuery checks whether we need more queries and signals readLoop if necessary.
func (pf *PreNegFilter) checkQuery() {
if pf.waitingForNext && pf.canDialCount == 0 {
if pf.waitingForNext && len(pf.waiting) == 0 {
pf.needQueries = pf.maxPendingQueries
}
if pf.needQueries > pf.pendingQueries {
pf.cond.Signal()
if pf.needQueries > len(pf.pending) {
diff := pf.needQueries - len(pf.pending)
for i := 0; i < diff; i++ {
pf.cond.Signal()
}
}
}

// readLoop reads nodes from the input iterator and starts new queries if necessary
func (pf *PreNegFilter) readLoop() {
for {
pf.lock.Lock()
if pf.testClock != nil {
for pf.pendingQueries == pf.maxPendingQueries {
// advance simulated clock until our queries are finished or timed out
pf.lock.Unlock()
pf.testClock.Run(time.Second)
pf.lock.Lock()
if pf.closed {
pf.lock.Unlock()
return
}
}
}
for pf.needQueries <= pf.pendingQueries {
// either no queries are needed or we have enough pending; wait until more
// are needed
for pf.needQueries <= len(pf.pending) {
// either no queries are needed or we have enough pending;
// wait until more are needed
pf.cond.Wait()
if pf.closed {
pf.lock.Unlock()
return
}
}
pf.lock.Unlock()

// fetch a node from the input that is not pending at the moment
var node *enode.Node
for {
Expand All @@ -139,6 +130,7 @@ func (pf *PreNegFilter) readLoop() {
return
}
node = pf.input.Node()

pf.lock.Lock()
_, pending := pf.pending[node]
pf.lock.Unlock()
Expand All @@ -151,23 +143,20 @@ func (pf *PreNegFilter) readLoop() {
start, cancel := pf.query(node, func(canDial bool) {
if canDial {
pf.lock.Lock()
pf.needQueries = 0
pf.pendingQueries--
delete(pf.pending, node)
pf.needQueries = 0
pf.lock.Unlock()
pf.ns.SetState(node, pf.sfCanDial, pf.sfQueried, pf.canDialTimeout)
} else {
pf.ns.SetState(node, nodestate.Flags{}, pf.sfQueried, 0)
pf.lock.Lock()
pf.pendingQueries--
delete(pf.pending, node)
pf.checkQuery()
pf.lock.Unlock()
pf.ns.SetState(node, nodestate.Flags{}, pf.sfQueried, 0)
}
})
// add pending entry before actually starting
pf.lock.Lock()
pf.pendingQueries++
pf.pending[node] = cancel
pf.lock.Unlock()
start()
Expand All @@ -177,15 +166,15 @@ func (pf *PreNegFilter) readLoop() {
// Next moves to the next selectable node.
func (pf *PreNegFilter) Next() bool {
pf.lock.Lock()
pf.waitingForNext = true
// start queries if we cannot give a result immediately
pf.waitingForNext = true // start queries if we cannot give a result immediately
pf.checkQuery()
pf.lock.Unlock()
// get a result from the LIFO queue that returns nodes with active sfCanDial

next := pf.canDialIter.Next()
pf.lock.Lock()
pf.needQueries = 0
pf.waitingForNext = false
delete(pf.waiting, pf.Node())
pf.lock.Unlock()
return next
}
Expand All @@ -203,4 +192,4 @@ func (pf *PreNegFilter) Close() {
// Node returns the current node.
func (pf *PreNegFilter) Node() *enode.Node {
return pf.canDialIter.Node()
}
}
18 changes: 11 additions & 7 deletions les/lespay/client/prenegfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -61,32 +62,35 @@ func TestPreNegFilter(t *testing.T) {
}
}

testQuery := func(node *enode.Node, result func(canDial bool)) (start, cancel func()) {
var timeout uint32
testQuery := func(node *enode.Node, result func(canDial bool)) (func(), func()) {
idx := testNodeIndex(node.ID())
switch queryResult[idx] {
case 0:
return func() { result(false) }, func() {} // negative answer
case 1:
return func() { result(true) }, func() {} // positive answer
case 2:
return func() {}, func() { result(false) } // timeout
return func() {
clock.Run(5 * time.Second)
atomic.AddUint32(&timeout, 1)
}, func() {} // timeout
}
return nil, nil
}
pf := NewPreNegFilter(ns, enode.IterNodes(nodes), testQuery, sfTest1, sfTest2, 5, time.Second*5, time.Second*10, clock)
pf := NewPreNegFilter(ns, enode.IterNodes(nodes), testQuery, sfTest1, sfTest2, 5, time.Second*5, time.Second*10)
pfr := enode.Filter(pf, func(node *enode.Node) bool {
// remove canDial flag so that filter can keep querying
ns.SetState(node, nodestate.Flags{}, sfTest2, 0)
return true
})
ns.Start()

l := len(enode.ReadNodes(pfr, pfTestTotal))
if l != pfTestPositive {
t.Errorf("Wrong number of returned result (got %d, expected %d)", l, pfTestPositive)
}
dt := time.Duration(clock.Now())
expdt := time.Second * pfTestTimeout // 5 secs per timeout, 5 waiting in parallel
if dt != expdt {
t.Errorf("Wrong amount of time required (got %v, expected %v)", dt, expdt)
if atomic.LoadUint32(&timeout) != pfTestTimeout {
t.Errorf("Wrong amount of timeout query number(got %v, expected %v)", timeout, pfTestTimeout)
}
}
6 changes: 1 addition & 5 deletions les/serverpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,7 @@ func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, d

iter := enode.Iterator(s.mixer)
if query != nil {
var testClock *mclock.Simulated
if testing {
testClock = mono.(*mclock.Simulated)
}
iter = lpc.NewPreNegFilter(s.ns, iter, query, sfQueried, sfCanDial, 5, time.Second*5, time.Second*10, testClock)
iter = lpc.NewPreNegFilter(s.ns, iter, query, sfQueried, sfCanDial, 5, time.Second*5, time.Second*10)
}
s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool {
s.ns.SetState(node, sfDialed, sfCanDial, time.Second*10)
Expand Down