diff --git a/p2p/adapters/exec.go b/p2p/adapters/exec.go index 4eecd6b4fce9..7e7557b377c6 100644 --- a/p2p/adapters/exec.go +++ b/p2p/adapters/exec.go @@ -356,7 +356,7 @@ func (p *PeerAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) { go func() { events := make(chan *p2p.PeerEvent) - sub := p.server().SubscribePeers(events) + sub := p.server().SubscribeEvents(events) defer sub.Unsubscribe() for { diff --git a/p2p/adapters/inproc.go b/p2p/adapters/inproc.go index 740e69bfc5df..844d988ca1fa 100644 --- a/p2p/adapters/inproc.go +++ b/p2p/adapters/inproc.go @@ -29,9 +29,9 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -func newPeer(rw *p2p.MsgPipeRW) *Peer { +func newPeer(rw MsgReadWriteCloser) *Peer { return &Peer{ - MsgPipeRW: rw, + MsgReadWriteCloser: rw, Errc: make(chan error, 1), Connc: make(chan bool), Readyc: make(chan bool), @@ -39,7 +39,7 @@ func newPeer(rw *p2p.MsgPipeRW) *Peer { } type Peer struct { - *p2p.MsgPipeRW + MsgReadWriteCloser Connc chan bool Readyc chan bool Errc chan error @@ -52,6 +52,12 @@ type Network interface { Reporter } +// Adds close pipe to the MsgReadWriter +type MsgReadWriteCloser interface { + p2p.MsgReadWriter + Close() error +} + // SimNode is the network adapter that type SimNode struct { lock sync.RWMutex @@ -169,7 +175,7 @@ func (self *SimNode) getPeer(id *NodeId) *Peer { return self.peers[i] } -func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer { +func (self *SimNode) setPeer(id *NodeId, rw MsgReadWriteCloser) *Peer { i, found := self.peerMap[id.NodeID] if !found { i = len(self.peers) @@ -183,7 +189,8 @@ func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer { // } // legit reconnect reset disconnection error, p := self.peers[i] - p.MsgPipeRW = rw + //p.MsgPipeRW = rw + p.MsgReadWriteCloser = rw p.Connc = make(chan bool) p.Readyc = make(chan bool) return p @@ -194,11 +201,12 @@ func (self *SimNode) RemovePeer(node *discover.Node) { defer self.lock.Unlock() id := &NodeId{node.ID} peer := self.getPeer(id) - if peer == nil || peer.MsgPipeRW == nil { + //if peer == nil || peer.MsgPipeRW == nil { + if peer == nil || peer.MsgReadWriteCloser == nil { return } - peer.MsgPipeRW.Close() - peer.MsgPipeRW = nil + peer.MsgReadWriteCloser.Close() + peer.MsgReadWriteCloser = nil // na := self.network.GetNodeAdapter(id) // peer = na.(*SimNode).GetPeer(self.Id) // peer.RW = nil @@ -216,7 +224,8 @@ func (self *SimNode) AddPeer(node *discover.Node) { rw, rrw := p2p.MsgPipe() // // run protocol on remote node with self as peer peer := self.getPeer(id) - if peer != nil && peer.MsgPipeRW != nil { + //if peer != nil && peer.MsgPipeRW != nil { + if peer != nil && peer.MsgReadWriteCloser != nil { return } peer = self.setPeer(id, rrw) @@ -228,7 +237,7 @@ func (self *SimNode) AddPeer(node *discover.Node) { self.RunProtocol(na.(*SimNode), rw, rrw, peer) } -func (self *SimNode) SubscribePeers(ch chan *p2p.PeerEvent) event.Subscription { +func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { return self.peerFeed.Subscribe(ch) } @@ -246,7 +255,7 @@ func (self *SimNode) PeersInfo() (info []*p2p.PeerInfo) { return nil } -func (self *SimNode) RunProtocol(node *SimNode, rw, rrw p2p.MsgReadWriter, peer *Peer) { +func (self *SimNode) RunProtocol(node *SimNode, rw, rrw MsgReadWriteCloser, peer *Peer) { id := node.Id protocol := self.service.Protocols()[0] if protocol.Run == nil { diff --git a/p2p/message.go b/p2p/message.go index ee8d0efbb1fc..1b44a9326d56 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -27,7 +27,11 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" + + ) // Msg defines the structure of a p2p message. @@ -271,3 +275,57 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error { } return nil } + + +// wraps a msgreadwriter to allow for emitting message events upon +// send or receive +type MsgReporterRW struct { + MsgReadWriter + feed *event.Feed + peerid discover.NodeID + closefunc func() error +} + +func NewMsgReporterRW(feed *event.Feed, rw MsgReadWriter, id discover.NodeID, closefunc func() error) *MsgReporterRW { + return &MsgReporterRW{ + MsgReadWriter: rw, + feed: feed, + peerid: id, + closefunc: closefunc, + } +} + +func (self *MsgReporterRW) ReadMsg() (Msg, error) { + msg, err := self.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + event := PeerEvent{ + Type: PeerEventTypeMsgRecv, + Peer: self.peerid, + Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size), + } + self.feed.Send(event) + return msg, nil +} + +func (self *MsgReporterRW) WriteMsg(msg Msg) error { + err := self.MsgReadWriter.WriteMsg(msg) + if err != nil { + return err + } + event := PeerEvent{ + Type: PeerEventTypeMsgSend, + Peer: self.peerid, + Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size), + } + self.feed.Send(event) + return nil +} + +func (self *MsgReporterRW) Close() error { + if self.closefunc != nil { + return self.closefunc() + } + return nil +} diff --git a/p2p/peer.go b/p2p/peer.go index 49126343c1fa..c815af1f0beb 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -71,6 +71,14 @@ const ( // PeerEventTypeDrop is the type of event emitted when a peer is // dropped from a p2p.Server PeerEventTypeDrop PeerEventType = "drop" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgSend PeerEventType = "msgsend" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgRecv PeerEventType = "msgrecv" ) // PeerEvent is an event emitted when peers are either added or dropped from @@ -78,6 +86,7 @@ const ( type PeerEvent struct { Type PeerEventType Peer discover.NodeID + Label string } // Peer represents a connected remote node. @@ -381,6 +390,10 @@ func (rw *protoRW) ReadMsg() (Msg, error) { } } +func (rw *protoRW) Close() error { + return nil +} + // PeerInfo represents a short summary of the information known about a connected // peer. Sub-protocol independent fields are contained and initialized here, with // protocol specifics delegated to all connected sub-protocols. diff --git a/p2p/server.go b/p2p/server.go index bba0c54a0892..1f22c479fce6 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -142,7 +142,7 @@ type Server interface { Stop() error AddPeer(node *discover.Node) RemovePeer(node *discover.Node) - SubscribePeers(ch chan *PeerEvent) event.Subscription + SubscribeEvents(ch chan *PeerEvent) event.Subscription PeerCount() int NodeInfo() *NodeInfo PeersInfo() []*PeerInfo @@ -309,7 +309,7 @@ func (srv *server) RemovePeer(node *discover.Node) { } // SubscribePeers subscribes the given channel to peer events -func (srv *server) SubscribePeers(ch chan *PeerEvent) event.Subscription { +func (srv *server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { return srv.peerFeed.Subscribe(ch) } diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go index eb8037d69446..87ffa7588b7c 100644 --- a/p2p/testing/protocolsession.go +++ b/p2p/testing/protocolsession.go @@ -79,7 +79,7 @@ func (self *ProtocolSession) trigger(trig Trigger) error { if peer == nil { panic(fmt.Sprintf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.Ids))) } - if peer.MsgPipeRW == nil { + if peer.MsgReadWriteCloser == nil { return fmt.Errorf("trigger: peer %v unreachable", trig.Peer) } errc := make(chan error) @@ -112,7 +112,7 @@ func (self *ProtocolSession) expect(exp Expect) error { if peer == nil { panic(fmt.Sprintf("expect: peer %v does not exist (1- %v)", exp.Peer, len(self.Ids))) } - if peer.MsgPipeRW == nil { + if peer.MsgReadWriteCloser== nil { return fmt.Errorf("trigger: peer %v unreachable", exp.Peer) } @@ -247,7 +247,7 @@ func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error func (self *ProtocolSession) Stop() { for _, id := range self.Ids { p := self.GetPeer(id) - if p != nil && p.MsgPipeRW != nil { + if p != nil && p.MsgReadWriteCloser != nil { p.Close() } }