Skip to content

Commit

Permalink
p2p/simulations: improvement and adding msg events
Browse files Browse the repository at this point in the history
* Conceals rw in messenger type
* Adds generic remote control for nodes in network sim
* Adds msg type and event to sim, journal
* Msg http output, http options handler
* Msg event added to "cytoscape" for output to visualizer
* Msg event structure changed to fit "cytodata" logic
* Fix for handling CORS/PUT requests:
* fallthrough OPTIONS method handler
* Access-Control-Allow-Methods header added (note that the servermux panics if sent empty method value through http)
  • Loading branch information
nolash authored and zelig committed Feb 12, 2017
1 parent 0285349 commit eab6b51
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 85 deletions.
41 changes: 21 additions & 20 deletions p2p/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
)

func newPeer(rw p2p.MsgReadWriter) *Peer {
func newPeer(m Messenger) *Peer {
return &Peer{
RW: rw,
Messenger: m,
Errc: make(chan error, 1),
Flushc: make(chan bool),
}
}

type Peer struct {
RW p2p.MsgReadWriter
//RW p2p.MsgReadWriter
Messenger
Errc chan error
Flushc chan bool
}
Expand All @@ -51,17 +52,17 @@ type SimNode struct {
lock sync.RWMutex
Id *NodeId
network Network
messenger Messenger
messenger func(p2p.MsgReadWriter) Messenger
peerMap map[discover.NodeID]int
peers []*Peer
Run ProtoCall
}

func (self *SimNode) Messenger() Messenger {
return self.messenger
func (self *SimNode) Messenger(rw p2p.MsgReadWriter) Messenger {
return self.messenger(rw)
}

func NewSimNode(id *NodeId, n Network, m Messenger) *SimNode {
func NewSimNode(id *NodeId, n Network, m func(p2p.MsgReadWriter) Messenger) *SimNode {
return &SimNode{
Id: id,
network: n,
Expand Down Expand Up @@ -95,24 +96,24 @@ func (self *SimNode) getPeer(id *NodeId) *Peer {
func (self *SimNode) SetPeer(id *NodeId, rw p2p.MsgReadWriter) {
self.lock.Lock()
defer self.lock.Unlock()
self.setPeer(id, rw)
self.setPeer(id, self.Messenger(rw))
}

func (self *SimNode) setPeer(id *NodeId, rw p2p.MsgReadWriter) *Peer {
func (self *SimNode) setPeer(id *NodeId, m Messenger) *Peer {
i, found := self.peerMap[id.NodeID]
if !found {
i = len(self.peers)
self.peerMap[id.NodeID] = i
p := newPeer(rw)
p := newPeer(m)
self.peers = append(self.peers, p)
return p
}
if self.peers[i] != nil && rw != nil {
if self.peers[i] != nil && m != nil {
panic(fmt.Sprintf("pipe for %v already set", id))
}
// legit reconnect reset disconnection error,
p := self.peers[i]
p.RW = rw
p.Messenger = m
return p
}

Expand All @@ -121,11 +122,11 @@ func (self *SimNode) Disconnect(rid []byte) error {
defer self.lock.Unlock()
id := NewNodeId(rid)
peer := self.getPeer(id)
if peer == nil || peer.RW == nil {
if peer == nil || peer.Messenger == nil {
return fmt.Errorf("already disconnected")
}
peer.RW.(*p2p.MsgPipeRW).Close()
peer.RW = nil
peer.Messenger.Close()
peer.Messenger = nil
// na := self.network.GetNodeAdapter(id)
// peer = na.(*SimNode).GetPeer(self.Id)
// peer.RW = nil
Expand All @@ -146,30 +147,30 @@ func (self *SimNode) Connect(rid []byte) error {
defer close(runc)
// run protocol on remote node with self as peer

err := na.(*SimNode).runProtocol(self.Id, rrw, rw, runc)
err := na.(ProtocolRunner).RunProtocol(self.Id, rrw, rw, runc)
if err != nil {
return fmt.Errorf("cannot run protocol (%v -> %v) %v", self.Id, id, err)
}
// run protocol on remote node with self as peer
err = self.runProtocol(id, rw, rrw, runc)
err = self.RunProtocol(id, rw, rrw, runc)
if err != nil {
return fmt.Errorf("cannot run protocol (%v -> %v): %v", id, self.Id, err)
}
self.network.DidConnect(self.Id, id)
return nil
}

func (self *SimNode) runProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error {
func (self *SimNode) RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error {
if self.Run == nil {
glog.V(6).Infof("no protocol starting on peer %v (connection with %v)", self.Id, id)
return nil
}
glog.V(6).Infof("protocol starting on peer %v (connection with %v)", self.Id, id)
peer := self.getPeer(id)
if peer != nil && peer.RW != nil {
if peer != nil && peer.Messenger != nil {
return fmt.Errorf("already connected %v to peer %v", self.Id, id)
}
peer = self.setPeer(id, rrw)
peer = self.setPeer(id, self.Messenger(rrw))
p := p2p.NewPeer(id.NodeID, Name(id.Bytes()), []p2p.Cap{})
go func() {
err := self.Run(p, rw)
Expand Down
28 changes: 19 additions & 9 deletions p2p/adapters/msgpipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,30 @@ import (
// peer session test
// ExpectMsg(p2p.MsgReader, uint64, interface{}) error
// SendMsg(p2p.MsgWriter, uint64, interface{}) error
type SimPipe struct{}
type SimPipe struct{
rw p2p.MsgReadWriter
}

func (self *SimPipe) SendMsg(code uint64, msg interface{}) error {
return p2p.Send(self.rw, code, msg)
}

func (self *SimPipe) ReadMsg() (p2p.Msg, error) {
return self.rw.ReadMsg()
}

func (*SimPipe) SendMsg(w p2p.MsgWriter, code uint64, msg interface{}) error {
return p2p.Send(w, code, msg)
func (self *SimPipe) TriggerMsg(code uint64, msg interface{}) error {
return p2p.Send(self.rw, code, msg)
}

func (*SimPipe) ReadMsg(r p2p.MsgReader) (p2p.Msg, error) {
return r.ReadMsg()
func (self *SimPipe) ExpectMsg(code uint64, msg interface{}) error {
return p2p.ExpectMsg(self.rw, code, msg)
}

func (*SimPipe) TriggerMsg(w p2p.MsgWriter, code uint64, msg interface{}) error {
return p2p.Send(w, code, msg)
func (self *SimPipe) Close() {
self.rw.(*p2p.MsgPipeRW).Close()
}

func (*SimPipe) ExpectMsg(r p2p.MsgReader, code uint64, msg interface{}) error {
return p2p.ExpectMsg(r, code, msg)
func NewSimPipe(rw p2p.MsgReadWriter) Messenger {
return Messenger(&SimPipe{rw})
}
38 changes: 24 additions & 14 deletions p2p/adapters/rlpx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package adapters
import (
"fmt"
"net"
//"encoding/binary"

"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
Expand All @@ -30,25 +31,27 @@ type RLPx struct {
id *NodeId
net *p2p.Server
addr []byte
m Messenger
m func(p2p.MsgReadWriter) Messenger
r Reporter
}

type RLPxMessenger struct {
rw p2p.MsgReadWriter
}

func NewRLPx(addr []byte, srv *p2p.Server, m Messenger) *RLPx {
if m == nil {
m = &RLPxMessenger{}
}
func NewRLPxMessenger(rw p2p.MsgReadWriter) Messenger {
return Messenger(&RLPxMessenger{rw: rw})
}

func NewRLPx(addr []byte, srv *p2p.Server, m func(p2p.MsgReadWriter) Messenger) *RLPx {
return &RLPx{
net: srv,
addr: addr,
m: m,
}
}

func NewReportingRLPx(addr []byte, srv *p2p.Server, m Messenger, r Reporter) *RLPx {
func NewReportingRLPx(addr []byte, srv *p2p.Server, m func(p2p.MsgReadWriter) Messenger, r Reporter) *RLPx {
rlpx := NewRLPx(addr, srv, m)
rlpx.r = r
srv.PeerConnHook = func(p *p2p.Peer) {
Expand All @@ -60,12 +63,16 @@ func NewReportingRLPx(addr []byte, srv *p2p.Server, m Messenger, r Reporter) *RL
return rlpx
}

func (RLPxMessenger) SendMsg(w p2p.MsgWriter, code uint64, msg interface{}) error {
return p2p.Send(w, code, msg)
func (self *RLPxMessenger) SendMsg(code uint64, msg interface{}) error {
return p2p.Send(self.rw, code, msg)
}

func (RLPxMessenger) ReadMsg(r p2p.MsgReader) (p2p.Msg, error) {
return r.ReadMsg()
func (self *RLPxMessenger) ReadMsg() (p2p.Msg, error) {
return self.rw.ReadMsg()
}

func (self *RLPxMessenger) Close() {

}

func (self *RLPx) LocalAddr() []byte {
Expand All @@ -83,12 +90,15 @@ func (self *RLPx) Connect(enode []byte) error {
return nil
}

func (self *RLPx) Messenger() Messenger {
return self.m
func (self *RLPx) Messenger(rw p2p.MsgReadWriter) Messenger {
return self.m(rw)
}

func (self *RLPx) Disconnect(p *p2p.Peer, rw p2p.MsgReadWriter) error {
p.Disconnect(p2p.DiscSubprotocolError)
//func (self *RLPx) Disconnect(p *p2p.Peer, rw p2p.MsgReadWriter) error {
func (self *RLPx) Disconnect(b []byte) error {
//p.Disconnect(p2p.DiscSubprotocolError)
//d, _ := binary.Uvarint(b)
//p.Disconnect(p2p.DiscReason(d))
return nil
}

Expand Down
33 changes: 30 additions & 3 deletions p2p/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package adapters

import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
)
Expand Down Expand Up @@ -63,8 +64,9 @@ func (self *NodeId) Label() string {
}

type Messenger interface {
SendMsg(p2p.MsgWriter, uint64, interface{}) error
ReadMsg(p2p.MsgReader) (p2p.Msg, error)
SendMsg(uint64, interface{}) error
ReadMsg() (p2p.Msg, error)
Close()
}

type NodeAdapter interface {
Expand All @@ -73,7 +75,12 @@ type NodeAdapter interface {
// Disconnect(*p2p.Peer, p2p.MsgReadWriter)
LocalAddr() []byte
ParseAddr([]byte, string) ([]byte, error)
Messenger() Messenger
// Messenger() Messenger <<<... old version
Messenger(p2p.MsgReadWriter) Messenger
}

type ProtocolRunner interface {
RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error
}

type StartAdapter interface {
Expand All @@ -85,3 +92,23 @@ type Reporter interface {
DidConnect(*NodeId, *NodeId) error
DidDisconnect(*NodeId, *NodeId) error
}


func RandomNodeId() *NodeId {
key, err := crypto.GenerateKey()
if err != nil {
panic("unable to generate key")
}
var id discover.NodeID
pubkey := crypto.FromECDSAPub(&key.PublicKey)
copy(id[:], pubkey[1:])
return &NodeId{id}
}

func RandomNodeIds(n int) []*NodeId {
var ids []*NodeId
for i := 0; i < n; i++ {
ids = append(ids, RandomNodeId())
}
return ids
}
43 changes: 31 additions & 12 deletions p2p/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ type CodeMap struct {
messages map[reflect.Type]uint64 // index of types to codes, for sending by type
}

func (self *CodeMap) GetCode(msg interface{}) (uint64, bool) {
code, found := self.messages[reflect.TypeOf(msg)]
return code, found
}

func NewCodeMap(name string, version uint, maxMsgSize int, msgs ...interface{}) *CodeMap {
self := &CodeMap{
Name: name,
Expand All @@ -133,10 +138,6 @@ func NewCodeMap(name string, version uint, maxMsgSize int, msgs ...interface{})
return self
}

func (self *CodeMap) GetCode(msg interface{}) uint64 {
return self.messages[reflect.TypeOf(msg)]
}

func (self *CodeMap) Length() uint64 {
return uint64(len(self.codes))
}
Expand All @@ -157,6 +158,26 @@ func (self *CodeMap) Register(msgs ...interface{}) {
}
}

func NewProtocol(protocolname string, protocolversion uint, run func(*Peer) error, na adapters.NodeAdapter, ct *CodeMap) *p2p.Protocol {

r := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {

m := na.Messenger(rw)

peer := NewPeer(p, ct, m, func() {})

return run(peer)

}

return &p2p.Protocol{
Name: protocolname,
Version: protocolversion,
Length: ct.Length(),
Run: r,
}
}

// A Peer represents a remote peer or protocol instance that is running on a peer connection with
// a remote peer
type Peer struct {
Expand All @@ -172,12 +193,11 @@ type Peer struct {
// this constructor is called by the p2p.Protocol#Run function
// the first two arguments are comming the arguments passed to p2p.Protocol.Run function
// the third argument is the CodeMap describing the protocol messages and options
func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, ct *CodeMap, m adapters.Messenger, disconn func()) *Peer {
func NewPeer(p *p2p.Peer, ct *CodeMap, m adapters.Messenger, disconn func()) *Peer {
return &Peer{
ct: ct,
m: m,
Peer: p,
rw: rw,
handlers: make(map[reflect.Type][]func(interface{}) error),
disconnect: disconn,
}
Expand Down Expand Up @@ -229,13 +249,12 @@ func (self *Peer) Drop() {
// this low level call will be wrapped by libraries providing routed or broadcast sends
// but often just used to forward and push messages to directly connected peers
func (self *Peer) Send(msg interface{}) error {
typ := reflect.TypeOf(msg)
code, found := self.ct.messages[typ]
code, found := self.ct.GetCode(msg)
if !found {
return errorf(ErrInvalidMsgType, "%v", typ)
return errorf(ErrInvalidMsgType, "%v", code)
}
glog.V(logger.Debug).Infof("=> %v %v (%d)", msg, typ, code)
err := self.m.SendMsg(self.rw, uint64(code), msg)
glog.V(logger.Debug).Infof("=> %v (%d)", msg, code)
err := self.m.SendMsg(uint64(code), msg)
if err != nil {
self.Drop()
return errorf(ErrWrite, "(msg code: %v): %v", code, err)
Expand All @@ -249,7 +268,7 @@ func (self *Peer) Send(msg interface{}) error {
// checks message size, out-of-range message codes, handles decoding with reflection,
// call handlers as callback onside
func (self *Peer) handleIncoming() (interface{}, error) {
msg, err := self.m.ReadMsg(self.rw)
msg, err := self.m.ReadMsg()
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit eab6b51

Please sign in to comment.