Skip to content

Commit

Permalink
Merge pull request #14 from rjl493456442/clientpool-refactor2
Browse files Browse the repository at this point in the history
les: polish clientdb
  • Loading branch information
zsfelfoldi authored Jun 23, 2020
2 parents 70aa1c2 + 5936462 commit 3d323ef
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 29 deletions.
22 changes: 4 additions & 18 deletions les/lespay/server/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,10 @@ func NewBalanceTracker(ns *nodestate.NodeStateMachine, setup BalanceTrackerSetup
negExp: negExp,
quit: make(chan struct{}),
}
var start enode.ID
for {
ids := bt.ndb.getPosBalanceIDs(start, enode.ID{}, 1000)
var stop bool
l := len(ids)
if l == 1000 {
l--
start = ids[l]
} else {
stop = true
}
for i := 0; i < l; i++ {
bt.totalAmount.AddExp(bt.ndb.getOrNewBalance(ids[i].Bytes(), false))
}
if stop {
break
}
}
bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
bt.totalAmount.AddExp(balance)
return true
})

ns.SubscribeField(bt.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
n, _ := ns.GetField(node, bt.BalanceField).(*NodeBalance)
Expand Down
43 changes: 33 additions & 10 deletions les/lespay/server/clientdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,20 @@ func (db *nodeDB) getPosBalanceIDs(start, stop enode.ID, maxCount int) (result [
return
}
prefix := db.getPrefix(false)
keylen := len(prefix) + len(enode.ID{})

it := db.db.NewIterator(prefix, start.Bytes())
defer it.Release()
for i := len(stop[:]) - 1; i >= 0; i-- {
stop[i]--
if stop[i] != 255 {
break
}
}
stopKey := db.key(stop.Bytes(), false)
keyLen := len(stopKey)

for it.Next() {
var id enode.ID
if len(it.Key()) != keyLen || bytes.Compare(it.Key(), stopKey) == 1 {
if len(it.Key()) != keylen {
return
}
copy(id[:], it.Key()[keylen-len(id):])
if bytes.Compare(id.Bytes(), stop.Bytes()) >= 0 {
return
}
copy(id[:], it.Key()[keyLen-len(id):])
result = append(result, id)
if len(result) == maxCount {
return
Expand All @@ -182,6 +179,32 @@ func (db *nodeDB) getPosBalanceIDs(start, stop enode.ID, maxCount int) (result [
return
}

// forEachBalance iterates all balances and passes values to callback.
func (db *nodeDB) forEachBalance(neg bool, callback func(id enode.ID, balance utils.ExpiredValue) bool) {
prefix := db.getPrefix(neg)
keylen := len(prefix) + len(enode.ID{})

it := db.db.NewIterator(prefix, nil)
defer it.Release()

for it.Next() {
var id enode.ID
if len(it.Key()) != keylen {
return
}
copy(id[:], it.Key()[keylen-len(id):])

var b utils.ExpiredValue
if err := rlp.DecodeBytes(it.Value(), &b); err != nil {
continue
}
if !callback(id, b) {
return
}
}
return
}

func (db *nodeDB) expirer() {
for {
select {
Expand Down
3 changes: 2 additions & 1 deletion les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
// Feed cost tracker request serving statistic.
h.server.costTracker.updateStats(msg.Code, amount, servingTime, realCost)
// Reduce priority "balance" for the specific peer.
/*balance =*/ p.balance.RequestServed(realCost)
/*balance =*/
p.balance.RequestServed(realCost)
}
if reply != nil {
p.queueSend(func() {
Expand Down

0 comments on commit 3d323ef

Please sign in to comment.