From 8c82519273eeb69b8e50119eb4ac97333c62afb9 Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 30 Apr 2017 19:36:16 +0200 Subject: [PATCH] p2p, p2p/adapters, p2p/testing: message reporting for simulation layer Adds a MsgReporter struct that wraps MsgReadWriter that posts message read and write to an subscribable event.Feed --- p2p/adapters/exec.go | 2 +- p2p/adapters/inproc.go | 31 +++++++++++------ p2p/message.go | 58 +++++++++++++++++++++++++++++++ p2p/peer.go | 13 +++++++ p2p/server.go | 4 +-- p2p/testing/protocolsession.go | 6 ++-- swarm/api/ws/server.go | 54 ----------------------------- swarm/api/ws/server_test.go | 62 ---------------------------------- 8 files changed, 97 insertions(+), 133 deletions(-) delete mode 100644 swarm/api/ws/server.go delete mode 100644 swarm/api/ws/server_test.go diff --git a/p2p/adapters/exec.go b/p2p/adapters/exec.go index 4eecd6b4fce9..7e7557b377c6 100644 --- a/p2p/adapters/exec.go +++ b/p2p/adapters/exec.go @@ -356,7 +356,7 @@ func (p *PeerAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) { go func() { events := make(chan *p2p.PeerEvent) - sub := p.server().SubscribePeers(events) + sub := p.server().SubscribeEvents(events) defer sub.Unsubscribe() for { diff --git a/p2p/adapters/inproc.go b/p2p/adapters/inproc.go index 740e69bfc5df..844d988ca1fa 100644 --- a/p2p/adapters/inproc.go +++ b/p2p/adapters/inproc.go @@ -29,9 +29,9 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -func newPeer(rw *p2p.MsgPipeRW) *Peer { +func newPeer(rw MsgReadWriteCloser) *Peer { return &Peer{ - MsgPipeRW: rw, + MsgReadWriteCloser: rw, Errc: make(chan error, 1), Connc: make(chan bool), Readyc: make(chan bool), @@ -39,7 +39,7 @@ func newPeer(rw *p2p.MsgPipeRW) *Peer { } type Peer struct { - *p2p.MsgPipeRW + MsgReadWriteCloser Connc chan bool Readyc chan bool Errc chan error @@ -52,6 +52,12 @@ type Network interface { Reporter } +// Adds close pipe to the MsgReadWriter +type MsgReadWriteCloser interface { + p2p.MsgReadWriter + Close() error +} + // SimNode is the network adapter that type SimNode struct { lock sync.RWMutex @@ -169,7 +175,7 @@ func (self *SimNode) getPeer(id *NodeId) *Peer { return self.peers[i] } -func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer { +func (self *SimNode) setPeer(id *NodeId, rw MsgReadWriteCloser) *Peer { i, found := self.peerMap[id.NodeID] if !found { i = len(self.peers) @@ -183,7 +189,8 @@ func (self *SimNode) setPeer(id *NodeId, rw *p2p.MsgPipeRW) *Peer { // } // legit reconnect reset disconnection error, p := self.peers[i] - p.MsgPipeRW = rw + //p.MsgPipeRW = rw + p.MsgReadWriteCloser = rw p.Connc = make(chan bool) p.Readyc = make(chan bool) return p @@ -194,11 +201,12 @@ func (self *SimNode) RemovePeer(node *discover.Node) { defer self.lock.Unlock() id := &NodeId{node.ID} peer := self.getPeer(id) - if peer == nil || peer.MsgPipeRW == nil { + //if peer == nil || peer.MsgPipeRW == nil { + if peer == nil || peer.MsgReadWriteCloser == nil { return } - peer.MsgPipeRW.Close() - peer.MsgPipeRW = nil + peer.MsgReadWriteCloser.Close() + peer.MsgReadWriteCloser = nil // na := self.network.GetNodeAdapter(id) // peer = na.(*SimNode).GetPeer(self.Id) // peer.RW = nil @@ -216,7 +224,8 @@ func (self *SimNode) AddPeer(node *discover.Node) { rw, rrw := p2p.MsgPipe() // // run protocol on remote node with self as peer peer := self.getPeer(id) - if peer != nil && peer.MsgPipeRW != nil { + //if peer != nil && peer.MsgPipeRW != nil { + if peer != nil && peer.MsgReadWriteCloser != nil { return } peer = self.setPeer(id, rrw) @@ -228,7 +237,7 @@ func (self *SimNode) AddPeer(node *discover.Node) { self.RunProtocol(na.(*SimNode), rw, rrw, peer) } -func (self *SimNode) SubscribePeers(ch chan *p2p.PeerEvent) event.Subscription { +func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { return self.peerFeed.Subscribe(ch) } @@ -246,7 +255,7 @@ func (self *SimNode) PeersInfo() (info []*p2p.PeerInfo) { return nil } -func (self *SimNode) RunProtocol(node *SimNode, rw, rrw p2p.MsgReadWriter, peer *Peer) { +func (self *SimNode) RunProtocol(node *SimNode, rw, rrw MsgReadWriteCloser, peer *Peer) { id := node.Id protocol := self.service.Protocols()[0] if protocol.Run == nil { diff --git a/p2p/message.go b/p2p/message.go index ee8d0efbb1fc..1b44a9326d56 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -27,7 +27,11 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" + + ) // Msg defines the structure of a p2p message. @@ -271,3 +275,57 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error { } return nil } + + +// wraps a msgreadwriter to allow for emitting message events upon +// send or receive +type MsgReporterRW struct { + MsgReadWriter + feed *event.Feed + peerid discover.NodeID + closefunc func() error +} + +func NewMsgReporterRW(feed *event.Feed, rw MsgReadWriter, id discover.NodeID, closefunc func() error) *MsgReporterRW { + return &MsgReporterRW{ + MsgReadWriter: rw, + feed: feed, + peerid: id, + closefunc: closefunc, + } +} + +func (self *MsgReporterRW) ReadMsg() (Msg, error) { + msg, err := self.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + event := PeerEvent{ + Type: PeerEventTypeMsgRecv, + Peer: self.peerid, + Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size), + } + self.feed.Send(event) + return msg, nil +} + +func (self *MsgReporterRW) WriteMsg(msg Msg) error { + err := self.MsgReadWriter.WriteMsg(msg) + if err != nil { + return err + } + event := PeerEvent{ + Type: PeerEventTypeMsgSend, + Peer: self.peerid, + Label: fmt.Sprintf("%d,%d", msg.Code, msg.Size), + } + self.feed.Send(event) + return nil +} + +func (self *MsgReporterRW) Close() error { + if self.closefunc != nil { + return self.closefunc() + } + return nil +} diff --git a/p2p/peer.go b/p2p/peer.go index 49126343c1fa..c815af1f0beb 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -71,6 +71,14 @@ const ( // PeerEventTypeDrop is the type of event emitted when a peer is // dropped from a p2p.Server PeerEventTypeDrop PeerEventType = "drop" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgSend PeerEventType = "msgsend" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgRecv PeerEventType = "msgrecv" ) // PeerEvent is an event emitted when peers are either added or dropped from @@ -78,6 +86,7 @@ const ( type PeerEvent struct { Type PeerEventType Peer discover.NodeID + Label string } // Peer represents a connected remote node. @@ -381,6 +390,10 @@ func (rw *protoRW) ReadMsg() (Msg, error) { } } +func (rw *protoRW) Close() error { + return nil +} + // PeerInfo represents a short summary of the information known about a connected // peer. Sub-protocol independent fields are contained and initialized here, with // protocol specifics delegated to all connected sub-protocols. diff --git a/p2p/server.go b/p2p/server.go index bba0c54a0892..1f22c479fce6 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -142,7 +142,7 @@ type Server interface { Stop() error AddPeer(node *discover.Node) RemovePeer(node *discover.Node) - SubscribePeers(ch chan *PeerEvent) event.Subscription + SubscribeEvents(ch chan *PeerEvent) event.Subscription PeerCount() int NodeInfo() *NodeInfo PeersInfo() []*PeerInfo @@ -309,7 +309,7 @@ func (srv *server) RemovePeer(node *discover.Node) { } // SubscribePeers subscribes the given channel to peer events -func (srv *server) SubscribePeers(ch chan *PeerEvent) event.Subscription { +func (srv *server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { return srv.peerFeed.Subscribe(ch) } diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go index 36705d3c19ac..7594be0745b2 100644 --- a/p2p/testing/protocolsession.go +++ b/p2p/testing/protocolsession.go @@ -79,7 +79,7 @@ func (self *ProtocolSession) trigger(trig Trigger) error { if peer == nil { panic(fmt.Sprintf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.Ids))) } - if peer.MsgPipeRW == nil { + if peer.MsgReadWriteCloser == nil { return fmt.Errorf("trigger: peer %v unreachable", trig.Peer) } errc := make(chan error) @@ -112,7 +112,7 @@ func (self *ProtocolSession) expect(exp Expect) error { if peer == nil { panic(fmt.Sprintf("expect: peer %v does not exist (1- %v)", exp.Peer, len(self.Ids))) } - if peer.MsgPipeRW == nil { + if peer.MsgReadWriteCloser== nil { return fmt.Errorf("trigger: peer %v unreachable", exp.Peer) } @@ -247,7 +247,7 @@ func (self *ProtocolSession) TestDisconnected(disconnects ...*Disconnect) error func (self *ProtocolSession) Stop() { for _, id := range self.Ids { p := self.GetPeer(id) - if p != nil && p.MsgPipeRW != nil { + if p != nil && p.MsgReadWriteCloser != nil { p.Close() } } diff --git a/swarm/api/ws/server.go b/swarm/api/ws/server.go deleted file mode 100644 index 71cac8c3cd23..000000000000 --- a/swarm/api/ws/server.go +++ /dev/null @@ -1,54 +0,0 @@ -package ws - -import ( - "net" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc" -) - -// Server is the basic configuration needs for the HTTP server and also -// includes CORS settings. -type Server struct { - //Addr string - CorsString string - Endpoint string -} - -// startWS initializes and starts the websocket RPC endpoint. -func StartWSServer(apis []rpc.API, server *Server) error { - - // Generate the whitelist based on the allowed modules - /*whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - }*/ - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - //if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace) - //} - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", server.Endpoint); err != nil { - return err - } - rpc.NewWSServer(server.CorsString, handler).Serve(listener) - glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", server.Endpoint) - - // All listeners booted successfully - //n.wsEndpoint = endpoint - //n.wsListener = listener - //n.wsHandler = handler - - return nil -} diff --git a/swarm/api/ws/server_test.go b/swarm/api/ws/server_test.go deleted file mode 100644 index 56f2aa9fe2bb..000000000000 --- a/swarm/api/ws/server_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package ws - -import ( - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc" -) - -func init() { - glog.SetV(logger.Detail) - glog.SetToStderr(true) -} - -type TestResult struct { - Foo string `json:"foo"` -} - -func TestStartWSServer(t *testing.T) { - ep := "localhost:8099" - server := &Server{ - Endpoint: ep, - CorsString: "*", - } - apis := []rpc.API{ - { - Namespace: "pss", - Version: "0.1", - Service: makeFakeAPIHandler(), - Public: true, - }, - } - go func() { - err := StartWSServer(apis, server) - t.Logf("wsserver exited: %v", err) - }() - - time.Sleep(time.Second) - - client, err := rpc.DialWebsocket(context.Background(), "ws://" + ep, "ws://localhost") - if err != nil { - t.Fatalf("could not connect: %v", err) - } else { - t.Logf("client: %v", client) - client.Call(&TestResult{}, "pss_test") - } - -} - -func makeFakeAPIHandler() *FakeAPIHandler { - return &FakeAPIHandler{} -} - -type FakeAPIHandler struct { -} - -func (self *FakeAPIHandler) Test() { - glog.V(logger.Detail).Infof("in fakehandler Test()") -}