From eab6b511270dca632226b09491dde25e0e5f5fd6 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 6 Feb 2017 23:44:45 +0100 Subject: [PATCH] p2p/simulations: improvement and adding msg events * Conceals rw in messenger type * Adds generic remote control for nodes in network sim * Adds msg type and event to sim, journal * Msg http output, http options handler * Msg event added to "cytoscape" for output to visualizer * Msg event structure changed to fit "cytodata" logic * Fix for handling CORS/PUT requests: * fallthrough OPTIONS method handler * Access-Control-Allow-Methods header added (note that the servermux panics if sent empty method value through http) --- p2p/adapters/inproc.go | 41 +++++------ p2p/adapters/msgpipes.go | 28 +++++--- p2p/adapters/rlpx.go | 38 ++++++---- p2p/adapters/types.go | 33 ++++++++- p2p/protocols/protocol.go | 43 +++++++---- p2p/protocols/protocol_test.go | 2 +- p2p/simulations/cytoscape.go | 22 ++++-- p2p/simulations/network.go | 56 ++++++++++++++- p2p/simulations/rest_api_server.go | 1 + p2p/simulations/session_controller.go | 83 +++++++++++++++++++++- p2p/simulations/session_controller_test.go | 64 ++++++++++++++++- p2p/testing/exchange.go | 28 ++++---- p2p/testing/sessions.go | 5 +- swarm/network/protocol.go | 43 ++++++++++- 14 files changed, 402 insertions(+), 85 deletions(-) diff --git a/p2p/adapters/inproc.go b/p2p/adapters/inproc.go index c741b81dc072..822446dd5217 100644 --- a/p2p/adapters/inproc.go +++ b/p2p/adapters/inproc.go @@ -25,16 +25,17 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" ) -func newPeer(rw p2p.MsgReadWriter) *Peer { +func newPeer(m Messenger) *Peer { return &Peer{ - RW: rw, + Messenger: m, Errc: make(chan error, 1), Flushc: make(chan bool), } } type Peer struct { - RW p2p.MsgReadWriter + //RW p2p.MsgReadWriter + Messenger Errc chan error Flushc chan bool } @@ -51,17 +52,17 @@ type SimNode struct { lock sync.RWMutex Id *NodeId network Network - messenger Messenger + messenger func(p2p.MsgReadWriter) Messenger peerMap map[discover.NodeID]int peers []*Peer Run ProtoCall } -func (self *SimNode) Messenger() Messenger { - return self.messenger +func (self *SimNode) Messenger(rw p2p.MsgReadWriter) Messenger { + return self.messenger(rw) } -func NewSimNode(id *NodeId, n Network, m Messenger) *SimNode { +func NewSimNode(id *NodeId, n Network, m func(p2p.MsgReadWriter) Messenger) *SimNode { return &SimNode{ Id: id, network: n, @@ -95,24 +96,24 @@ func (self *SimNode) getPeer(id *NodeId) *Peer { func (self *SimNode) SetPeer(id *NodeId, rw p2p.MsgReadWriter) { self.lock.Lock() defer self.lock.Unlock() - self.setPeer(id, rw) + self.setPeer(id, self.Messenger(rw)) } -func (self *SimNode) setPeer(id *NodeId, rw p2p.MsgReadWriter) *Peer { +func (self *SimNode) setPeer(id *NodeId, m Messenger) *Peer { i, found := self.peerMap[id.NodeID] if !found { i = len(self.peers) self.peerMap[id.NodeID] = i - p := newPeer(rw) + p := newPeer(m) self.peers = append(self.peers, p) return p } - if self.peers[i] != nil && rw != nil { + if self.peers[i] != nil && m != nil { panic(fmt.Sprintf("pipe for %v already set", id)) } // legit reconnect reset disconnection error, p := self.peers[i] - p.RW = rw + p.Messenger = m return p } @@ -121,11 +122,11 @@ func (self *SimNode) Disconnect(rid []byte) error { defer self.lock.Unlock() id := NewNodeId(rid) peer := self.getPeer(id) - if peer == nil || peer.RW == nil { + if peer == nil || peer.Messenger == nil { return fmt.Errorf("already disconnected") } - peer.RW.(*p2p.MsgPipeRW).Close() - peer.RW = nil + peer.Messenger.Close() + peer.Messenger = nil // na := self.network.GetNodeAdapter(id) // peer = na.(*SimNode).GetPeer(self.Id) // peer.RW = nil @@ -146,12 +147,12 @@ func (self *SimNode) Connect(rid []byte) error { defer close(runc) // run protocol on remote node with self as peer - err := na.(*SimNode).runProtocol(self.Id, rrw, rw, runc) + err := na.(ProtocolRunner).RunProtocol(self.Id, rrw, rw, runc) if err != nil { return fmt.Errorf("cannot run protocol (%v -> %v) %v", self.Id, id, err) } // run protocol on remote node with self as peer - err = self.runProtocol(id, rw, rrw, runc) + err = self.RunProtocol(id, rw, rrw, runc) if err != nil { return fmt.Errorf("cannot run protocol (%v -> %v): %v", id, self.Id, err) } @@ -159,17 +160,17 @@ func (self *SimNode) Connect(rid []byte) error { return nil } -func (self *SimNode) runProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error { +func (self *SimNode) RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error { if self.Run == nil { glog.V(6).Infof("no protocol starting on peer %v (connection with %v)", self.Id, id) return nil } glog.V(6).Infof("protocol starting on peer %v (connection with %v)", self.Id, id) peer := self.getPeer(id) - if peer != nil && peer.RW != nil { + if peer != nil && peer.Messenger != nil { return fmt.Errorf("already connected %v to peer %v", self.Id, id) } - peer = self.setPeer(id, rrw) + peer = self.setPeer(id, self.Messenger(rrw)) p := p2p.NewPeer(id.NodeID, Name(id.Bytes()), []p2p.Cap{}) go func() { err := self.Run(p, rw) diff --git a/p2p/adapters/msgpipes.go b/p2p/adapters/msgpipes.go index f9416e4fdc14..3c8be82a3300 100644 --- a/p2p/adapters/msgpipes.go +++ b/p2p/adapters/msgpipes.go @@ -31,20 +31,30 @@ import ( // peer session test // ExpectMsg(p2p.MsgReader, uint64, interface{}) error // SendMsg(p2p.MsgWriter, uint64, interface{}) error -type SimPipe struct{} +type SimPipe struct{ + rw p2p.MsgReadWriter +} + +func (self *SimPipe) SendMsg(code uint64, msg interface{}) error { + return p2p.Send(self.rw, code, msg) +} + +func (self *SimPipe) ReadMsg() (p2p.Msg, error) { + return self.rw.ReadMsg() +} -func (*SimPipe) SendMsg(w p2p.MsgWriter, code uint64, msg interface{}) error { - return p2p.Send(w, code, msg) +func (self *SimPipe) TriggerMsg(code uint64, msg interface{}) error { + return p2p.Send(self.rw, code, msg) } -func (*SimPipe) ReadMsg(r p2p.MsgReader) (p2p.Msg, error) { - return r.ReadMsg() +func (self *SimPipe) ExpectMsg(code uint64, msg interface{}) error { + return p2p.ExpectMsg(self.rw, code, msg) } -func (*SimPipe) TriggerMsg(w p2p.MsgWriter, code uint64, msg interface{}) error { - return p2p.Send(w, code, msg) +func (self *SimPipe) Close() { + self.rw.(*p2p.MsgPipeRW).Close() } -func (*SimPipe) ExpectMsg(r p2p.MsgReader, code uint64, msg interface{}) error { - return p2p.ExpectMsg(r, code, msg) +func NewSimPipe(rw p2p.MsgReadWriter) Messenger { + return Messenger(&SimPipe{rw}) } diff --git a/p2p/adapters/rlpx.go b/p2p/adapters/rlpx.go index ffa78664a7d4..1adf3e04583a 100644 --- a/p2p/adapters/rlpx.go +++ b/p2p/adapters/rlpx.go @@ -19,6 +19,7 @@ package adapters import ( "fmt" "net" + //"encoding/binary" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -30,17 +31,19 @@ type RLPx struct { id *NodeId net *p2p.Server addr []byte - m Messenger + m func(p2p.MsgReadWriter) Messenger r Reporter } type RLPxMessenger struct { + rw p2p.MsgReadWriter } -func NewRLPx(addr []byte, srv *p2p.Server, m Messenger) *RLPx { - if m == nil { - m = &RLPxMessenger{} - } +func NewRLPxMessenger(rw p2p.MsgReadWriter) Messenger { + return Messenger(&RLPxMessenger{rw: rw}) +} + +func NewRLPx(addr []byte, srv *p2p.Server, m func(p2p.MsgReadWriter) Messenger) *RLPx { return &RLPx{ net: srv, addr: addr, @@ -48,7 +51,7 @@ func NewRLPx(addr []byte, srv *p2p.Server, m Messenger) *RLPx { } } -func NewReportingRLPx(addr []byte, srv *p2p.Server, m Messenger, r Reporter) *RLPx { +func NewReportingRLPx(addr []byte, srv *p2p.Server, m func(p2p.MsgReadWriter) Messenger, r Reporter) *RLPx { rlpx := NewRLPx(addr, srv, m) rlpx.r = r srv.PeerConnHook = func(p *p2p.Peer) { @@ -60,12 +63,16 @@ func NewReportingRLPx(addr []byte, srv *p2p.Server, m Messenger, r Reporter) *RL return rlpx } -func (RLPxMessenger) SendMsg(w p2p.MsgWriter, code uint64, msg interface{}) error { - return p2p.Send(w, code, msg) +func (self *RLPxMessenger) SendMsg(code uint64, msg interface{}) error { + return p2p.Send(self.rw, code, msg) } -func (RLPxMessenger) ReadMsg(r p2p.MsgReader) (p2p.Msg, error) { - return r.ReadMsg() +func (self *RLPxMessenger) ReadMsg() (p2p.Msg, error) { + return self.rw.ReadMsg() +} + +func (self *RLPxMessenger) Close() { + } func (self *RLPx) LocalAddr() []byte { @@ -83,12 +90,15 @@ func (self *RLPx) Connect(enode []byte) error { return nil } -func (self *RLPx) Messenger() Messenger { - return self.m +func (self *RLPx) Messenger(rw p2p.MsgReadWriter) Messenger { + return self.m(rw) } -func (self *RLPx) Disconnect(p *p2p.Peer, rw p2p.MsgReadWriter) error { - p.Disconnect(p2p.DiscSubprotocolError) +//func (self *RLPx) Disconnect(p *p2p.Peer, rw p2p.MsgReadWriter) error { +func (self *RLPx) Disconnect(b []byte) error { + //p.Disconnect(p2p.DiscSubprotocolError) + //d, _ := binary.Uvarint(b) + //p.Disconnect(p2p.DiscReason(d)) return nil } diff --git a/p2p/adapters/types.go b/p2p/adapters/types.go index c5e62f416af2..681e0948eb64 100644 --- a/p2p/adapters/types.go +++ b/p2p/adapters/types.go @@ -17,6 +17,7 @@ package adapters import ( + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" ) @@ -63,8 +64,9 @@ func (self *NodeId) Label() string { } type Messenger interface { - SendMsg(p2p.MsgWriter, uint64, interface{}) error - ReadMsg(p2p.MsgReader) (p2p.Msg, error) + SendMsg(uint64, interface{}) error + ReadMsg() (p2p.Msg, error) + Close() } type NodeAdapter interface { @@ -73,7 +75,12 @@ type NodeAdapter interface { // Disconnect(*p2p.Peer, p2p.MsgReadWriter) LocalAddr() []byte ParseAddr([]byte, string) ([]byte, error) - Messenger() Messenger + // Messenger() Messenger <<<... old version + Messenger(p2p.MsgReadWriter) Messenger +} + +type ProtocolRunner interface { + RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, runc chan bool) error } type StartAdapter interface { @@ -85,3 +92,23 @@ type Reporter interface { DidConnect(*NodeId, *NodeId) error DidDisconnect(*NodeId, *NodeId) error } + + +func RandomNodeId() *NodeId { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + var id discover.NodeID + pubkey := crypto.FromECDSAPub(&key.PublicKey) + copy(id[:], pubkey[1:]) + return &NodeId{id} +} + +func RandomNodeIds(n int) []*NodeId { + var ids []*NodeId + for i := 0; i < n; i++ { + ids = append(ids, RandomNodeId()) + } + return ids +} diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 06a51272bf84..ffcb9b256834 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -122,6 +122,11 @@ type CodeMap struct { messages map[reflect.Type]uint64 // index of types to codes, for sending by type } +func (self *CodeMap) GetCode(msg interface{}) (uint64, bool) { + code, found := self.messages[reflect.TypeOf(msg)] + return code, found +} + func NewCodeMap(name string, version uint, maxMsgSize int, msgs ...interface{}) *CodeMap { self := &CodeMap{ Name: name, @@ -133,10 +138,6 @@ func NewCodeMap(name string, version uint, maxMsgSize int, msgs ...interface{}) return self } -func (self *CodeMap) GetCode(msg interface{}) uint64 { - return self.messages[reflect.TypeOf(msg)] -} - func (self *CodeMap) Length() uint64 { return uint64(len(self.codes)) } @@ -157,6 +158,26 @@ func (self *CodeMap) Register(msgs ...interface{}) { } } +func NewProtocol(protocolname string, protocolversion uint, run func(*Peer) error, na adapters.NodeAdapter, ct *CodeMap) *p2p.Protocol { + + r := func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + + m := na.Messenger(rw) + + peer := NewPeer(p, ct, m, func() {}) + + return run(peer) + + } + + return &p2p.Protocol{ + Name: protocolname, + Version: protocolversion, + Length: ct.Length(), + Run: r, + } +} + // A Peer represents a remote peer or protocol instance that is running on a peer connection with // a remote peer type Peer struct { @@ -172,12 +193,11 @@ type Peer struct { // this constructor is called by the p2p.Protocol#Run function // the first two arguments are comming the arguments passed to p2p.Protocol.Run function // the third argument is the CodeMap describing the protocol messages and options -func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, ct *CodeMap, m adapters.Messenger, disconn func()) *Peer { +func NewPeer(p *p2p.Peer, ct *CodeMap, m adapters.Messenger, disconn func()) *Peer { return &Peer{ ct: ct, m: m, Peer: p, - rw: rw, handlers: make(map[reflect.Type][]func(interface{}) error), disconnect: disconn, } @@ -229,13 +249,12 @@ func (self *Peer) Drop() { // this low level call will be wrapped by libraries providing routed or broadcast sends // but often just used to forward and push messages to directly connected peers func (self *Peer) Send(msg interface{}) error { - typ := reflect.TypeOf(msg) - code, found := self.ct.messages[typ] + code, found := self.ct.GetCode(msg) if !found { - return errorf(ErrInvalidMsgType, "%v", typ) + return errorf(ErrInvalidMsgType, "%v", code) } - glog.V(logger.Debug).Infof("=> %v %v (%d)", msg, typ, code) - err := self.m.SendMsg(self.rw, uint64(code), msg) + glog.V(logger.Debug).Infof("=> %v (%d)", msg, code) + err := self.m.SendMsg(uint64(code), msg) if err != nil { self.Drop() return errorf(ErrWrite, "(msg code: %v): %v", code, err) @@ -249,7 +268,7 @@ func (self *Peer) Send(msg interface{}) error { // checks message size, out-of-range message codes, handles decoding with reflection, // call handlers as callback onside func (self *Peer) handleIncoming() (interface{}, error) { - msg, err := self.m.ReadMsg(self.rw) + msg, err := self.m.ReadMsg() if err != nil { return nil, err } diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go index 60470c041cdc..71f0291cb207 100644 --- a/p2p/protocols/protocol_test.go +++ b/p2p/protocols/protocol_test.go @@ -64,7 +64,7 @@ func newProtocol(pp *p2ptest.TestPeerPool, wg *sync.WaitGroup) func(adapters.Nod wg.Add(1) } id := &adapters.NodeId{p.ID()} - peer := NewPeer(p, rw, ct, na.Messenger(), func() { na.Disconnect(id.Bytes()) }) + peer := NewPeer(p, ct, na.Messenger(rw), func() { na.Disconnect(id.Bytes()) }) // demonstrates use of peerPool, killing another peer connection as a response to a message peer.Register(&kill{}, func(msg interface{}) error { diff --git a/p2p/simulations/cytoscape.go b/p2p/simulations/cytoscape.go index a24c797f24fb..37eddd0ab695 100644 --- a/p2p/simulations/cytoscape.go +++ b/p2p/simulations/cytoscape.go @@ -28,13 +28,15 @@ type CyElement struct { } type CyUpdate struct { - Add []*CyElement `json:"add"` - Remove []string `json:"remove"` + Add []*CyElement `json:"add"` + Remove []string `json:"remove"` + Message []string `json:"message"` } func UpdateCy(conf *CyConfig, j *Journal) (*CyUpdate, error) { added := []*CyElement{} removed := []string{} + messaged := []string{} var el *CyElement update := func(e *event.Event) bool { entry := e.Data @@ -42,6 +44,14 @@ func UpdateCy(conf *CyConfig, j *Journal) (*CyUpdate, error) { if ev, ok := entry.(*NodeEvent); ok { el = &CyElement{Group: "nodes", Data: &CyData{Id: ev.node.Id.Label()}} action = ev.Action + } else if ev, ok := entry.(*MsgEvent); ok { + msg := ev.msg + id := ConnLabel(msg.One, msg.Other) + var source, target string + source = msg.One.Label() + target = msg.Other.Label() + el = &CyElement{Group: "msgs", Data: &CyData{Id: id, Source: source, Target: target}} + action = ev.Action } else if ev, ok := entry.(*ConnEvent); ok { // mutually exclusive directed edge (caller -> callee) conn := ev.conn @@ -67,6 +77,9 @@ func UpdateCy(conf *CyConfig, j *Journal) (*CyUpdate, error) { case "down": el.Data.Up = false removed = append(removed, el.Data.Id) + case "msg": + el.Data.Up = true + messaged = append(messaged, el.Data.Id) default: panic("unknown action") } @@ -75,7 +88,8 @@ func UpdateCy(conf *CyConfig, j *Journal) (*CyUpdate, error) { j.Read(update) return &CyUpdate{ - Add: added, - Remove: removed, + Add: added, + Remove: removed, + Message: messaged, }, nil } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 256aff33ddc8..0d19d867ca83 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -33,10 +33,15 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/adapters" "github.com/ethereum/go-ethereum/p2p/discover" ) +type NetworkQuery struct { + Type string +} + type NetworkConfig struct { // Type NetworkType // Config json.RawMessage // type-specific configs @@ -47,7 +52,7 @@ type NetworkConfig struct { // event types related to connectivity, i.e., nodes coming on dropping off // and connections established and dropped -var ConnectivityEvents = []interface{}{&NodeEvent{}, &ConnEvent{}} +var ConnectivityEvents = []interface{}{&NodeEvent{}, &ConnEvent{}, &MsgEvent{}} // NewNetworkController creates a ResourceController responding to GET and DELETE methods // it embeds a mockers controller, a journal player, node and connection contollers. @@ -55,6 +60,7 @@ var ConnectivityEvents = []interface{}{&NodeEvent{}, &ConnEvent{}} // Events from the eventer go into the provided journal. The content of the journal can be // accessed through the HTTP API. func NewNetworkController(conf *NetworkConfig, eventer *event.TypeMux, journal *Journal) Controller { + self := NewResourceContoller( &ResourceHandlers{ // GET // @@ -103,6 +109,7 @@ type Network struct { connMap map[string]int Nodes []*Node `json:"nodes"` Conns []*Conn `json:"conns"` + messenger func(p2p.MsgReadWriter) adapters.Messenger // // adapters.Messenger // node adapter function that creates the node model for @@ -116,6 +123,7 @@ func NewNetwork(triggers, events *event.TypeMux) *Network { events: events, nodeMap: make(map[discover.NodeID]int), connMap: make(map[string]int), + messenger: adapters.NewSimPipe, } } @@ -155,6 +163,12 @@ type ConnEvent struct { conn *Conn } +type MsgEvent struct { + Action string + Type string + msg *Msg +} + func (self *ConnEvent) String() string { return fmt.Sprintf("\n", self.Action, self.Type, self.conn) } @@ -163,6 +177,10 @@ func (self *NodeEvent) String() string { return fmt.Sprintf("\n", self.Action, self.Type, self.node) } +func (self *MsgEvent) String() string { + return fmt.Sprintf("\n", self.Action, self.Type, self.msg) +} + func (self *Node) event(up bool) *NodeEvent { var action string if up { @@ -210,6 +228,25 @@ func (self *Conn) event(up, rev bool) *ConnEvent { } } +type Msg struct { + One *adapters.NodeId `json:"one"` + Other *adapters.NodeId `json:"other"` + Code uint64 `json:"conn"` +} + +func (self *Msg) String() string { + return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.Label(), self.Other.Label()) +} + +func (self *Msg) event() *MsgEvent { + return &MsgEvent{ + Action: "up", + //Type: fmt.Sprintf("%d", self.Code), + Type: "msg", + msg: self, + } +} + type NodeConfig struct { Id *adapters.NodeId `json:"Id"` } @@ -253,6 +290,12 @@ func (self *Network) NewNode(conf *NodeConfig) error { return nil } +func (self *Network) NewGenericSimNode(conf *NodeConfig) adapters.NodeAdapter { + id := conf.Id + na := adapters.NewSimNode(id, self, self.messenger) + return na +} + // newConn adds a new connection to the network // it errors if the respective nodes do not exist func (self *Network) newConn(oneId, otherId *adapters.NodeId) (*Conn, error) { @@ -440,6 +483,17 @@ func (self *Network) DidDisconnect(one, other *adapters.NodeId) error { return nil } +// Send(senderid, receiverid) sends a message from one node to another +func (self *Network) Send(senderid, receiverid *adapters.NodeId, msgcode uint64, protomsg interface{}) { + msg := &Msg{ + One: senderid, + Other: receiverid, + Code: msgcode, + } + //self.GetNode(senderid).na.(*adapters.SimNode).GetPeer(receiverid).SendMsg(msgcode, protomsg) // phew! + self.events.Post(msg.event()) // should also include send status maybe +} + // GetNodeAdapter(id) returns the NodeAdapter for node with id // returns nil if node does not exist func (self *Network) GetNodeAdapter(id *adapters.NodeId) adapters.NodeAdapter { diff --git a/p2p/simulations/rest_api_server.go b/p2p/simulations/rest_api_server.go index ed99ed44c926..23219c660862 100644 --- a/p2p/simulations/rest_api_server.go +++ b/p2p/simulations/rest_api_server.go @@ -38,6 +38,7 @@ func handle(w http.ResponseWriter, r *http.Request, c Controller) { uri := requestURL.Path w.Header().Set("Content-Type", "text/json") w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") defer r.Body.Close() parts := strings.Split(uri, "/") var err error diff --git a/p2p/simulations/session_controller.go b/p2p/simulations/session_controller.go index ad6ef0c098f3..37818ad1088c 100644 --- a/p2p/simulations/session_controller.go +++ b/p2p/simulations/session_controller.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/adapters" + ) type returnHandler func(body io.Reader) (resp io.ReadSeeker, err error) @@ -34,6 +35,16 @@ type ResourceController struct { *ResourceHandlers } +type NodeResult struct { + Nodes []*Node +} + +type NodeIF struct { + One uint + Other uint + MessageType uint8 +} + var methodsAvailable = []string{"POST", "GET", "PUT", "DELETE"} func (self *ResourceHandlers) handler(method string) *ResourceHandler { @@ -47,7 +58,14 @@ func (self *ResourceHandlers) handler(method string) *ResourceHandler { h = self.Update case "DELETE": h = self.Destroy + case "OPTIONS": + h = &ResourceHandler{ + Handle: func(msg interface{}, c *ResourceController) (interface{}, error) { + return struct{}{}, nil + }, + } } + return h } @@ -75,7 +93,10 @@ func NewSessionController() (*ResourceController, chan bool) { Create: &ResourceHandler{ Handle: func(msg interface{}, parent *ResourceController) (interface{}, error) { conf := msg.(*NetworkConfig) - m := NewNetworkController(conf, &event.TypeMux{}, NewJournal()) + journal := NewJournal() + net := NewNetwork(nil, &event.TypeMux{}) + net.SetNaf(net.NewGenericSimNode) + m := NewNetworkController(conf, net.Events(), journal) if len(conf.Id) == 0 { conf.Id = fmt.Sprintf("%d", parent.id) } @@ -84,6 +105,66 @@ func NewSessionController() (*ResourceController, chan bool) { parent.SetResource(conf.Id, m) } parent.id++ + + m.SetResource("debug", NewResourceContoller( + &ResourceHandlers{ + Create: &ResourceHandler{ + Handle: func(msg interface{}, parent *ResourceController) (interface{}, error) { + journaldump := []string{} + eventfmt := func(e *event.Event) bool { + journaldump = append(journaldump, fmt.Sprintf("%v", e)) + return true + } + journal.Read(eventfmt) + return struct{Results []string}{Results: journaldump,}, nil + }, + }, + }, + )) + + m.SetResource("node", NewResourceContoller( + &ResourceHandlers{ + Create: &ResourceHandler{ + Handle: func(msg interface{}, parent *ResourceController) (interface{}, error) { + var nodeid *adapters.NodeId + + nodeid = adapters.RandomNodeId() + + net.NewNode(&NodeConfig{Id: nodeid}) + glog.V(6).Infof("added node %v to network %v", nodeid, net) + + return &NodeConfig{Id: nodeid}, nil + + }, + }, + Retrieve: &ResourceHandler{ + Handle: func(msg interface{}, parent *ResourceController) (interface{}, error) { + return &NodeResult{Nodes: net.Nodes}, nil + }, + }, + Update: &ResourceHandler{ + Handle: func(msg interface{}, parent *ResourceController) (interface{}, error) { + var othernode *Node + + args := msg.(*NodeIF) + onenode := net.Nodes[args.One - 1] + + if args.Other == 0 { + if net.Start(onenode.Id) != nil { + net.Stop(onenode.Id) + } + return &NodeResult{Nodes: []*Node{onenode}}, nil + } else { + othernode = net.Nodes[args.Other - 1] + net.Connect(onenode.Id, othernode.Id) + return &NodeResult{Nodes: []*Node{onenode, othernode}}, nil + } + }, + Type: reflect.TypeOf(&NodeIF{}), // this is input not output param structure + }, + }, + )) + return empty, nil }, Type: reflect.TypeOf(&NetworkConfig{}), diff --git a/p2p/simulations/session_controller_test.go b/p2p/simulations/session_controller_test.go index 1505ea65e4be..4e0f0f9cb369 100644 --- a/p2p/simulations/session_controller_test.go +++ b/p2p/simulations/session_controller_test.go @@ -8,12 +8,16 @@ import ( "net/http" "testing" "time" + "encoding/json" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/adapters" ) +/*** + * \todo rewrite this with a scripting engine to do http protocol xchanges more easily + */ const ( domain = "http://localhost" port = "8888" @@ -55,8 +59,61 @@ func TestDelete(t *testing.T) { } func TestCreate(t *testing.T) { - // should test that session controller POST creates network controller - // with proper endpoints + s, err := json.Marshal(&struct{Id string}{Id: "testnetwork"}) + req, err := http.NewRequest("POST", domain + ":" + port, bytes.NewReader(s)) + if err != nil { + t.Fatalf("unexpected error creating request: %v", err) + } + resp, err := (&http.Client{}).Do(req) + if err != nil { + t.Fatalf("unexpected error on http.Client request: %v", err) + } + req, err = http.NewRequest("POST", domain + ":" + port + "/testnetwork/debug/", nil) + if err != nil { + t.Fatalf("unexpected error creating request: %v", err) + } + resp, err = (&http.Client{}).Do(req) + if err != nil { + t.Fatalf("unexpected error on http.Client request: %v", err) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("error reading response body: %v", err) + } + t.Logf("%s", body) +} + +func TestNodes(t *testing.T) { + networkname := "testnetworkfornodes" + + s, err := json.Marshal(&struct{Id string}{Id: networkname}) + req, err := http.NewRequest("POST", domain + ":" + port, bytes.NewReader(s)) + if err != nil { + t.Fatalf("unexpected error creating request: %v", err) + } + resp, err := (&http.Client{}).Do(req) + if err != nil { + t.Fatalf("unexpected error on http.Client request: %v", err) + } + for i := 0; i < 3; i++ { + req, err = http.NewRequest("POST", domain + ":" + port + "/" + networkname + "/node/", nil) + if err != nil { + t.Fatalf("unexpected error creating request: %v", err) + } + resp, err = (&http.Client{}).Do(req) + if err != nil { + t.Fatalf("unexpected error on http.Client request: %v", err) + } + t.Logf("%s", resp) + } + /* + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("error reading response body: %v", err) + } + + t.Logf("%s", body) + */ } func testResponse(t *testing.T, method, addr string, r io.ReadSeeker) []byte { @@ -104,7 +161,8 @@ func TestUpdate(t *testing.T) { "group": "nodes" } ], - "remove": [] + "remove": [], + "message": [] }` resp := testResponse(t, "GET", url(port, "0"), bytes.NewReader([]byte("{}"))) if string(resp) != exp { diff --git a/p2p/testing/exchange.go b/p2p/testing/exchange.go index 4b0d44650288..9f32f715314e 100644 --- a/p2p/testing/exchange.go +++ b/p2p/testing/exchange.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/adapters" ) @@ -36,8 +35,8 @@ type TestNetAdapter interface { type TestMessenger interface { // MsgPipe([]byte, []byte) p2p,MsgPipe - ExpectMsg(p2p.MsgReader, uint64, interface{}) error - TriggerMsg(p2p.MsgWriter, uint64, interface{}) error + ExpectMsg(uint64, interface{}) error + TriggerMsg(uint64, interface{}) error } // exchanges are the basic units of protocol tests @@ -73,17 +72,18 @@ type Disconnect struct { // it allows for resource-driven scenario testing // disconnect reason errors are written in session.Errs // (correcponding to session.Peers) -func NewExchangeTestSession(t *testing.T, n TestNetAdapter, m TestMessenger, ids []*adapters.NodeId) *ExchangeTestSession { +//func NewExchangeTestSession(t *testing.T, n TestNetAdapter, m TestMessenger, ids []*adapters.NodeId) *ExchangeTestSession { +func NewExchangeTestSession(t *testing.T, n TestNetAdapter, ids []*adapters.NodeId) *ExchangeTestSession { return &ExchangeTestSession{ Ids: ids, TestNetAdapter: n, - TestMessenger: m, t: t, } } type TestPeerInfo struct { - RW p2p.MsgReadWriter + //RW p2p.MsgReadWriter + Messenger TestMessenger Flushc chan bool Errc chan error } @@ -94,15 +94,17 @@ func (self *ExchangeTestSession) trigger(trig Trigger) error { if peer == nil { panic(fmt.Sprintf("trigger: peer %v does not exist (1- %v)", trig.Peer, len(self.Ids))) } - rw := peer.RW - if rw == nil { + //rw := peer.RW + m := peer.Messenger + if m == nil { return fmt.Errorf("trigger: peer %v unreachable", trig.Peer) } errc := make(chan error) go func() { glog.V(6).Infof("trigger....") - errc <- self.TriggerMsg(rw, trig.Code, trig.Msg) + //errc <- self.TriggerMsg(rw, trig.Code, trig.Msg) + errc <- m.(TestMessenger).TriggerMsg(trig.Code, trig.Msg) glog.V(6).Infof("triggered") }() @@ -132,15 +134,17 @@ func (self *ExchangeTestSession) expect(exp Expect) error { if peer == nil { panic(fmt.Sprintf("expect: peer %v does not exist (1- %v)", exp.Peer, len(self.Ids))) } - rw := peer.RW - if rw == nil { + //rw := peer.RW + m := peer.Messenger + if m == nil { return fmt.Errorf("trigger: peer %v unreachable", exp.Peer) } errc := make(chan error) go func() { glog.V(6).Infof("waiting for msg, %v", exp.Msg) - errc <- self.ExpectMsg(rw, exp.Code, exp.Msg) + //errc <- self.ExpectMsg(rw, exp.Code, exp.Msg) + errc <- m.(TestMessenger).ExpectMsg(exp.Code, exp.Msg) }() t := exp.Timeout diff --git a/p2p/testing/sessions.go b/p2p/testing/sessions.go index 8789cbcc7977..13d37511b1ed 100644 --- a/p2p/testing/sessions.go +++ b/p2p/testing/sessions.go @@ -30,7 +30,7 @@ type ExchangeSession struct { // correct message exchange, forwarding, and broadcast // higher level or network behaviour should be tested with network simulators func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(id adapters.NodeAdapter) adapters.ProtoCall) *ExchangeSession { - simPipe := &adapters.SimPipe{} + simPipe := adapters.NewSimPipe network := simulations.NewNetwork(nil, nil) naf := func(conf *simulations.NodeConfig) adapters.NodeAdapter { na := adapters.NewSimNode(conf.Id, network, simPipe) @@ -49,7 +49,8 @@ func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(id ada } glog.V(6).Infof("network created") na := network.GetNode(id).Adapter() - s := NewExchangeTestSession(t, na.(TestNetAdapter), na.Messenger().(TestMessenger), nil) + //s := NewExchangeTestSession(t, na.(TestNetAdapter), na.Messenger().(TestMessenger), nil) + s := NewExchangeTestSession(t, na.(TestNetAdapter), nil) self := &ExchangeSession{ network: network, na: na, diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index aea06ba54d6d..2573fc9b2dd4 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -62,8 +62,8 @@ type Node interface { String() string // pretty printable the Node ID() discover.NodeID // the key that uniquely identifies the Node for the peerPool - Send(interface{}) error // can send messages - Drop() // disconnect this peer + Send(interface{}) error // can send messages + Drop() // disconnect this peer Register(interface{}, func(interface{}) error) uint64 // register message-handler callbacks } @@ -86,9 +86,44 @@ func BzzCodeMap(msgs ...interface{}) *protocols.CodeMap { return ct } +func Bzz(localAddr []byte, hive PeerPool, na adapters.NodeAdapter, ct *protocols.CodeMap, services func(Node) error) *p2p.Protocol { + run := func(p *protocols.Peer) error { + addr := &peerAddr{localAddr, na.LocalAddr()} + + bee := &bzz{Peer: p, hive: hive, network: na, localAddr: addr} + // protocol handshake and its validation + // sets remote peer address + err := bee.bzzHandshake() + if err != nil { + glog.V(6).Infof("handshake error in peer %v: %v", bee.ID(), err) + return err + } + + // mount external service models on the peer connection (swap, sync) + if services != nil { + err = services(bee) + if err != nil { + glog.V(6).Infof("protocol service error for peer %v: %v", bee.ID(), err) + return err + } + } + + err = hive.Add(bee) + if err != nil { + glog.V(6).Infof("failed to add peer '%v' to hive: %v", bee.ID(), err) + return err + } + + defer hive.Remove(bee) + return bee.Run() + } + + return protocols.NewProtocol(ProtocolName, Version, run, na, ct) +} + // Bzz is the protocol constructor // returns p2p.Protocol that is to be offered by the node.Service -func Bzz(localAddr []byte, hive PeerPool, na adapters.NodeAdapter, m adapters.Messenger, ct *protocols.CodeMap, services func(Node) error) *p2p.Protocol { +/*func Bzz(localAddr []byte, hive PeerPool, na adapters.NodeAdapter, m adapters.Messenger, ct *protocols.CodeMap, services func(Node) error) *p2p.Protocol { // handle handshake run := func(p *p2p.Peer, rw p2p.MsgReadWriter) error { @@ -145,6 +180,8 @@ func Bzz(localAddr []byte, hive PeerPool, na adapters.NodeAdapter, m adapters.Me } } +*/ + /* Handshake