Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: UDP pre-negotiation of available server capacity #22183

Merged
merged 7 commits into from
Mar 1, 2021
Merged
5 changes: 2 additions & 3 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type LazyQueue struct {
}

type (
PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)

Expand Down Expand Up @@ -139,11 +139,10 @@ func (q *LazyQueue) peekIndex() int {
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
now := q.clock.Now()
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
heap.Push(q.popQueue, &item{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
Expand Down
2 changes: 1 addition & 1 deletion common/prque/lazyqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type lazyItem struct {
index int
}

func testPriority(a interface{}, now mclock.AbsTime) int64 {
func testPriority(a interface{}) int64 {
return a.(*lazyItem).p
}

Expand Down
85 changes: 72 additions & 13 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,33 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les/vflux"
vfc "github.com/ethereum/go-ethereum/les/vflux/client"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)

type LightEthereum struct {
lesCommons

peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
pruner *pruner
peers *serverPeerSet
reqDist *requestDistributor
retriever *retrieveManager
odr *LesOdr
relay *lesTxRelay
handler *clientHandler
txPool *light.TxPool
blockchain *light.LightChain
serverPool *vfc.ServerPool
serverPoolIterator enode.Iterator
pruner *pruner

bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
Expand Down Expand Up @@ -112,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
p2pConfig: &stack.Config().P2P,
}

leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, leth.prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)

leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
Expand Down Expand Up @@ -189,6 +192,62 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}

// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
var replies vflux.Replies
if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil {
return nil
}
return replies
}

// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP
// service, as advertised in the ENR record
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
s.serverPool.Persist(n)
} else {
return 0
}
}

var les []rlp.RawValue
if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 {
return 0
}
var version uint
rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility).
return version
}

// prenegQuery sends a capacity query to the given server node to determine whether
// a connection slot is immediately available
func (s *LightEthereum) prenegQuery(n *enode.Node) int {
if s.vfxVersion(n) < 1 {
// UDP query not supported, always try TCP connection
return 1
}

var requests vflux.Requests
requests.Add("les", vflux.CapacityQueryName, vflux.CapacityQueryReq{
Bias: 180,
AddTokens: []vflux.IntOrInf{{}},
})
replies := s.VfluxRequest(n, requests)
zsfelfoldi marked this conversation as resolved.
Show resolved Hide resolved
var cqr vflux.CapacityQueryReply
if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil
return -1
}
if cqr[0] > 0 {
return 1
}
return 0
}

type LightDummyAPI struct{}

// Etherbase is the address that mining rewards will be send to
Expand Down Expand Up @@ -269,7 +328,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
return p.Info()
}
return nil
}, s.dialCandidates)
}, s.serverPoolIterator)
}

// Start implements node.Lifecycle, starting all internal goroutines needed by the
Expand Down
55 changes: 55 additions & 0 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/rlp"
)

const (
Expand Down Expand Up @@ -382,3 +384,56 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
}
}
}

// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
// and a bias time value. For each given token amount it calculates the maximum achievable
// capacity in case the amount is added to the balance.
func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
var req vflux.CapacityQueryReq
if rlp.DecodeBytes(data, &req) != nil {
return nil
}
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return nil
}
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
}
pb, _ := c.balance.GetBalance()
for i, addTokens := range req.AddTokens {
add := addTokens.Int64()
result[i] = curve.MaxCapacity(func(capacity uint64) int64 {
return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity)
})
if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap {
result[i] = f.minCap
}
if result[i] < f.minCap {
result[i] = 0
}
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}
6 changes: 4 additions & 2 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,10 @@ func TestNegativeBalanceCalculation(t *testing.T) {
for i := 0; i < 10; i++ {
pool.disconnect(newPoolTestPeer(i, nil))
_, nb := getBalance(pool, newPoolTestPeer(i, nil))
if checkDiff(nb, uint64(time.Minute)/1000) {
t.Fatalf("Negative balance mismatch, want %v, got %v", uint64(time.Minute)/1000, nb)
exp := uint64(time.Minute) / 1000
exp -= exp / 120 // correct for negative balance expiration
if checkDiff(nb, exp) {
t.Fatalf("Negative balance mismatch, want %v, got %v", exp, nb)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
_ []rlp.RawValue `rlp:"tail"`
VfxVersion uint
Rest []rlp.RawValue `rlp:"tail"`
}

func (lesEntry) ENRKey() string { return "les" }
Expand Down
29 changes: 26 additions & 3 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -68,6 +69,7 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
vfluxServer *vfs.Server
privateKey *ecdsa.PrivateKey

// Flow control and capacity management
Expand Down Expand Up @@ -112,12 +114,14 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
vfluxServer: vfs.NewServer(time.Millisecond * 10),
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
threadsIdle: threads,
p2pSrv: node.Server(),
}
srv.vfluxServer.Register(srv)
issync := e.Synced
if config.LightNoSyncServe {
issync = func() bool { return true }
Expand Down Expand Up @@ -201,7 +205,9 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}, nil)
// Add "les" ENR entries.
for i := range ps {
ps[i].Attributes = []enr.Entry{&lesEntry{}}
ps[i].Attributes = []enr.Entry{&lesEntry{
VfxVersion: 1,
}}
}
return ps
}
Expand All @@ -211,10 +217,11 @@ func (s *LesServer) Start() error {
s.privateKey = s.p2pSrv.PrivateKey
s.broadcaster.setSignerKey(s.privateKey)
s.handler.start()

s.wg.Add(1)
go s.capacityManagement()

if s.p2pSrv.DiscV5 != nil {
s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
}
return nil
}

Expand All @@ -228,6 +235,7 @@ func (s *LesServer) Stop() error {
s.costTracker.stop()
s.handler.stop()
s.servingQueue.stop()
s.vfluxServer.Stop()

// Note, bloom trie indexer is closed by parent bloombits indexer.
s.chtIndexer.Close()
Expand Down Expand Up @@ -311,3 +319,18 @@ func (s *LesServer) dropClient(id enode.ID) {
p.Peer.Disconnect(p2p.DiscRequested)
}
}

// ServiceInfo implements vfs.Service
func (s *LesServer) ServiceInfo() (string, string) {
return "les", "Ethereum light client service"
}

// Handle implements vfs.Service
func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
switch name {
case vflux.CapacityQueryName:
return s.clientPool.serveCapQuery(id, address, data)
default:
return nil
}
}
Loading