diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index 012a8c178d96..aaf2e3ad28b6 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -116,10 +116,15 @@ var ( SwarmUploadMimeType = cli.StringFlag{ Name: "mime", Usage: "force mime type", + } PssEnabledFlag = cli.BoolFlag{ Name: "pss", Usage: "Enable pss (message passing over swarm)", } + PssPortFlag = cli.IntFlag{ + Name: "pssport", + Usage: fmt.Sprintf("Websockets port for pss (default %d)", node.DefaultWSPort), + } CorsStringFlag = cli.StringFlag{ Name: "corsdomain", Usage: "Domain on which to send Access-Control-Allow-Origin header (multiple domains can be supplied separated by a ',')", @@ -265,6 +270,7 @@ Cleans database of corrupted entries. SwarmUploadMimeType, // pss flags PssEnabledFlag, + PssPortFlag, } app.Flags = append(app.Flags, debug.Flags...) app.Before = func(ctx *cli.Context) error { @@ -300,12 +306,19 @@ func version(ctx *cli.Context) error { func bzzd(ctx *cli.Context) error { cfg := defaultNodeConfig + if ctx.GlobalIsSet(PssEnabledFlag.Name) { + cfg.WSHost = "127.0.0.1" + cfg.WSModules = []string{"eth","pss"} + cfg.WSOrigins = []string{"*"} + if ctx.GlobalIsSet(PssPortFlag.Name) { + cfg.WSPort = ctx.GlobalInt(PssPortFlag.Name) + } + } utils.SetNodeConfig(ctx, &cfg) stack, err := node.New(&cfg) if err != nil { utils.Fatalf("can't create node: %v", err) } - registerBzzService(ctx, stack) utils.StartNode(stack) diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go index 36705d3c19ac..eb8037d69446 100644 --- a/p2p/testing/protocolsession.go +++ b/p2p/testing/protocolsession.go @@ -15,7 +15,7 @@ import ( type ProtocolSession struct { TestNodeAdapter - Ids []*adapters.NodeId + Ids []*adapters.NodeId ignore []uint64 } @@ -142,7 +142,7 @@ func (self *ProtocolSession) expect(exp Expect) error { } else { log.Warn("expectmsg errormsg parse error?!") } - } + } } else { log.Warn("expectmsg errormsg parse error?!") break diff --git a/swarm/api/ws/server.go b/swarm/api/ws/server.go deleted file mode 100644 index 71cac8c3cd23..000000000000 --- a/swarm/api/ws/server.go +++ /dev/null @@ -1,54 +0,0 @@ -package ws - -import ( - "net" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc" -) - -// Server is the basic configuration needs for the HTTP server and also -// includes CORS settings. -type Server struct { - //Addr string - CorsString string - Endpoint string -} - -// startWS initializes and starts the websocket RPC endpoint. -func StartWSServer(apis []rpc.API, server *Server) error { - - // Generate the whitelist based on the allowed modules - /*whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - }*/ - // Register all the APIs exposed by the services - handler := rpc.NewServer() - for _, api := range apis { - //if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace) - //} - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", server.Endpoint); err != nil { - return err - } - rpc.NewWSServer(server.CorsString, handler).Serve(listener) - glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", server.Endpoint) - - // All listeners booted successfully - //n.wsEndpoint = endpoint - //n.wsListener = listener - //n.wsHandler = handler - - return nil -} diff --git a/swarm/api/ws/server_test.go b/swarm/api/ws/server_test.go deleted file mode 100644 index 56f2aa9fe2bb..000000000000 --- a/swarm/api/ws/server_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package ws - -import ( - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "github.com/ethereum/go-ethereum/rpc" -) - -func init() { - glog.SetV(logger.Detail) - glog.SetToStderr(true) -} - -type TestResult struct { - Foo string `json:"foo"` -} - -func TestStartWSServer(t *testing.T) { - ep := "localhost:8099" - server := &Server{ - Endpoint: ep, - CorsString: "*", - } - apis := []rpc.API{ - { - Namespace: "pss", - Version: "0.1", - Service: makeFakeAPIHandler(), - Public: true, - }, - } - go func() { - err := StartWSServer(apis, server) - t.Logf("wsserver exited: %v", err) - }() - - time.Sleep(time.Second) - - client, err := rpc.DialWebsocket(context.Background(), "ws://" + ep, "ws://localhost") - if err != nil { - t.Fatalf("could not connect: %v", err) - } else { - t.Logf("client: %v", client) - client.Call(&TestResult{}, "pss_test") - } - -} - -func makeFakeAPIHandler() *FakeAPIHandler { - return &FakeAPIHandler{} -} - -type FakeAPIHandler struct { -} - -func (self *FakeAPIHandler) Test() { - glog.V(logger.Detail).Infof("in fakehandler Test()") -} diff --git a/swarm/network/pss.go b/swarm/network/pss.go index dafcbf713bf7..7aae837c06b4 100644 --- a/swarm/network/pss.go +++ b/swarm/network/pss.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/adapters" @@ -27,6 +28,8 @@ const ( digestLength = 64 digestCapacity = 256 defaultDigestCacheTTL = time.Second + pingTopicName = "pss" + pingTopicVersion = 1 ) var ( @@ -121,6 +124,7 @@ type Pss struct { //peerPool map[pot.Address]map[PssTopic]*PssReadWriter // keep track of all virtual p2p.Peers we are currently speaking to peerPool map[pot.Address]map[PssTopic]p2p.MsgReadWriter // keep track of all virtual p2p.Peers we are currently speaking to handlers map[PssTopic]func([]byte, *p2p.Peer, []byte) error // topic and version based pss payload handlers + events map[PssTopic]*event.Feed // subscriptions for each topic fwdcache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg cachettl time.Duration // how long to keep messages in fwdcache hasher func(string) storage.Hasher // hasher to digest message to cache @@ -148,6 +152,7 @@ func NewPss(k Overlay, params *PssParams) *Pss { //peerPool: make(map[pot.Address]map[PssTopic]*PssReadWriter, PssPeerCapacity), peerPool: make(map[pot.Address]map[PssTopic]p2p.MsgReadWriter, PssPeerCapacity), handlers: make(map[PssTopic]func([]byte, *p2p.Peer, []byte) error), + events: make(map[PssTopic]*event.Feed), fwdcache: make(map[pssDigest]pssCacheEntry), cachettl: params.Cachettl, hasher: storage.MakeHashFunc, @@ -210,11 +215,24 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool { func (self *Pss) Register(topic PssTopic, handler func(msg []byte, p *p2p.Peer, from []byte) error) error { self.lock.Lock() defer self.lock.Unlock() - self.handlers[topic] = handler + self.handlers[topic] = func(msg []byte, p *p2p.Peer, from []byte) error { + self.alertSubscribers(&topic, msg) + return handler(msg, p, from) + } + self.registerFeed(topic) return nil } -// Retrieves the handler function registered by *Pss.Register() +func (self *Pss) Subscribe(topic *PssTopic, ch chan []byte) (event.Subscription, error) { + _, ok := self.events[*topic] + if !ok { + return nil, fmt.Errorf("No feed registered for topic %v", topic) + } + sub := self.events[*topic].Subscribe(ch) + log.Trace("new pss subscribe", "topic", topic, "sub", sub) + return sub, nil +} + func (self *Pss) GetHandler(topic PssTopic) func([]byte, *p2p.Peer, []byte) error { self.lock.Lock() defer self.lock.Unlock() @@ -263,6 +281,20 @@ func (self *Pss) isActive(id pot.Address, topic PssTopic) bool { return true } +func (self *Pss) registerFeed(topic PssTopic) { + self.events[topic] = &event.Feed{} +} + +func (self *Pss) alertSubscribers(topic *PssTopic, msg []byte) error { + feed, ok := self.events[*topic] + if !ok { + return fmt.Errorf("No subscriptions registered for topic %v", topic) + } + numsent := feed.Send(msg) + log.Trace(fmt.Sprintf("pss sent to %d subscribers", numsent)) + return nil +} + // Sends a message using pss. The message could be anything at all, and will be handled by whichever handler function is mapped to PssTopic using *Pss.Register() // // The to address is a swarm overlay address @@ -289,7 +321,7 @@ func (self *Pss) Send(to []byte, topic PssTopic, msg []byte) error { // Handlers that want to pass on a message should call this directly func (self *Pss) Forward(msg *PssMsg) error { - if self.isSelfRecipient(msg) { + if self.IsSelfRecipient(msg) { return errorForwardToSelf } @@ -326,7 +358,7 @@ func (self *Pss) Forward(msg *PssMsg) error { return true }) if sent == 0 { - log.Warn("PSS Was not able to send to any peers") + return fmt.Errorf("PSS Was not able to send to any peers") } else { self.addFwdCacheExpire(digest) } @@ -436,13 +468,24 @@ func (self *PssProtocol) handle(msg []byte, p *p2p.Peer, senderAddr []byte) erro return nil } -func (ps *Pss) isSelfRecipient(msg *PssMsg) bool { - if bytes.Equal(msg.GetRecipient(), ps.Overlay.GetAddr().OverlayAddr()) { +func (self *Pss) IsSelfRecipient(msg *PssMsg) bool { + if bytes.Equal(msg.GetRecipient(), self.Overlay.GetAddr().OverlayAddr()) { return true } return false } +func (self *Pss) GetPingHandler() func([]byte, *p2p.Peer, []byte) error { + pingtopic, _ := MakeTopic(pingTopicName, pingTopicVersion) + return func(msg []byte, p *p2p.Peer, from []byte) error { + if bytes.Equal([]byte("ping"), msg) { + log.Trace(fmt.Sprintf("swarm pss ping from %x sending pong", common.ByteLabel(from))) + self.Send(from, pingtopic, []byte("pong")) + } + return nil + } +} + // Pre-Whisper placeholder func makeMsg(code uint64, msg interface{}) ([]byte, error) { diff --git a/swarm/network/pss_test.go b/swarm/network/pss_test.go index c3cd3b6e4dd4..4b705ca6bf64 100644 --- a/swarm/network/pss_test.go +++ b/swarm/network/pss_test.go @@ -5,6 +5,8 @@ import ( "encoding/hex" "fmt" "math/rand" + "net" + "net/http" "os" "strconv" "testing" @@ -31,7 +33,7 @@ func init() { // example protocol implementation peer // message handlers are methods of this -// goal is that we can use the same for "normal" p2p.protocols operations aswell as pss +// channels allow receipt reporting from p2p.Protocol message handler type pssTestPeer struct { *protocols.Peer hasProtocol bool @@ -41,6 +43,7 @@ type pssTestPeer struct { // example node simulation peer // modeled from swarm/network/simulations/discovery/discovery_test.go - commit 08b1e42f +// contains reporting channel for expect results so we can collect all async incoming msgs before deciding results type pssTestNode struct { *Hive *Pss @@ -52,6 +55,8 @@ type pssTestNode struct { run adapters.ProtoCall ct *protocols.CodeMap expectC chan []int + ws *http.Handler + apifunc func() []rpc.API } func (n *pssTestNode) Add(peer Peer) error { @@ -74,24 +79,14 @@ func (n *pssTestNode) Stop() error { return nil } -func (n *pssTestNode) connectPeer(s string) error { - return n.network.Connect(n.id, adapters.NewNodeIdFromHex(s)) -} - func (n *pssTestNode) hiveKeepAlive() <-chan time.Time { return time.Tick(time.Second * 10) } func (n *pssTestNode) triggerCheck() { - //go func() { n.trigger <- n.id }() go func() { n.trigger <- adapters.NewNodeId(n.Addr()) }() } -/* -func (n *pssTestNode) RunProtocol(id *adapters.NodeId, rw, rrw p2p.MsgReadWriter, peer *adapters.Peer) error { - return n.NodeAdapter.(adapters.ProtocolRunner).RunProtocol(id, rw, rrw, peer) -} -*/ func (n *pssTestNode) ProtoCall() adapters.ProtoCall { return n.run } @@ -102,18 +97,75 @@ func (n *pssTestNode) OverlayAddr() []byte { func (n *pssTestNode) UnderlayAddr() []byte { return n.id.Bytes() - //return n.Addr() } // the content of the msgs we're sending in the tests -type PssTestPayload struct { +type pssTestPayload struct { Data string } -func (m *PssTestPayload) String() string { +func (m *pssTestPayload) String() string { return m.Data } +type pssTestService struct { + node *pssTestNode // get addrs from this + msgFunc func(interface{}) error +} + +func newPssTestService(t *testing.T, handlefunc func(interface{}) error, testnode *pssTestNode) *pssTestService { + hp := NewHiveParams() + //hp.CallInterval = 250 + testnode.Hive = NewHive(hp, testnode.Pss.Overlay) + return &pssTestService{ + //nid := adapters.NewNodeId(addr.UnderlayAddr()) + msgFunc: handlefunc, + node: testnode, + } +} + +func (self *pssTestService) Start(server p2p.Server) error { + self.node.SimNode = server.(*adapters.SimNode) // server is adapter.SimnNode now + return nil +} + +func (self *pssTestService) Stop() error { + return nil +} + +func (self *pssTestService) Protocols() []p2p.Protocol { + ct := BzzCodeMap() + for _, m := range DiscoveryMsgs { + ct.Register(m) + } + ct.Register(&PssMsg{}) + + srv := func(p Peer) error { + p.Register(&PssMsg{}, self.msgFunc) + self.node.Add(p) + p.DisconnectHook(func(err error) { + self.node.Remove(p) + }) + return nil + } + + proto := Bzz(self.node.OverlayAddr(), self.node.UnderlayAddr(), ct, srv, nil, nil) + + return []p2p.Protocol{*proto} +} + +func (self *pssTestService) APIs() []rpc.API { + return []rpc.API{ + rpc.API{ + Namespace: "eth", + Version: "0.1/pss", + Service: NewPssApi(self.node.Pss), + Public: true, + }, + } + return nil +} + func TestPssCache(t *testing.T) { var err error to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") @@ -266,7 +318,7 @@ func testPssFullRandom(t *testing.T, numsends int, numnodes int, numfullnodes in expectnodesids := []*adapters.NodeId{} // the nodes to expect on (needed by checker) expectnodesresults := make(map[*adapters.NodeId][]int) // which messages expect actually got - vct := protocols.NewCodeMap(protocolName, protocolVersion, 65535, &PssTestPayload{}) + vct := protocols.NewCodeMap(protocolName, protocolVersion, 65535, &pssTestPayload{}) topic, _ := MakeTopic(protocolName, protocolVersion) trigger := make(chan *adapters.NodeId) @@ -367,10 +419,10 @@ func testPssFullRandom(t *testing.T, numsends int, numnodes int, numfullnodes in // send and monitor receive of pss action = func(ctx context.Context) error { - code, _ := vct.GetCode(&PssTestPayload{}) + code, _ := vct.GetCode(&pssTestPayload{}) for i := 0; i < len(sends); i += 2 { - msgbytes, _ := makeMsg(code, &PssTestPayload{ + msgbytes, _ := makeMsg(code, &pssTestPayload{ Data: fmt.Sprintf("%v", i+1), }) go func(i int, expectnodesresults map[*adapters.NodeId][]int) { @@ -464,8 +516,6 @@ func testPssFullRandom(t *testing.T, numsends int, numnodes int, numfullnodes in } } -// pss simulation test -// (simnodes running protocols) func TestPssFullLinearEcho(t *testing.T) { var action func(ctx context.Context) error @@ -478,7 +528,7 @@ func TestPssFullLinearEcho(t *testing.T) { var firstpssnode *adapters.NodeId var secondpssnode *adapters.NodeId - vct := protocols.NewCodeMap(protocolName, protocolVersion, 65535, &PssTestPayload{}) + vct := protocols.NewCodeMap(protocolName, protocolVersion, 65535, &pssTestPayload{}) topic, _ := MakeTopic(protocolName, protocolVersion) fullnodes := []*adapters.NodeId{} @@ -598,8 +648,8 @@ func TestPssFullLinearEcho(t *testing.T) { } action = func(ctx context.Context) error { - code, _ := vct.GetCode(&PssTestPayload{}) - msgbytes, _ := makeMsg(code, &PssTestPayload{ + code, _ := vct.GetCode(&pssTestPayload{}) + msgbytes, _ := makeMsg(code, &pssTestPayload{ Data: "ping", }) @@ -646,6 +696,234 @@ func TestPssFullLinearEcho(t *testing.T) { t.Log("Simulation Passed:") } +func TestPssFullWS(t *testing.T) { + + // settings for ws servers + var srvsendep = "localhost:18546" + var srvrecvep = "localhost:18547" + var clientrecvok, clientsendok bool + var clientrecv, clientsend *rpc.Client + + var action func(ctx context.Context) error + var check func(ctx context.Context, id *adapters.NodeId) (bool, error) + var ctx context.Context + var result *simulations.StepResult + var timeout time.Duration + var cancel context.CancelFunc + + var firstpssnode, secondpssnode *adapters.NodeId + fullnodes := []*adapters.NodeId{} + vct := protocols.NewCodeMap(protocolName, protocolVersion, 65535, &pssTestPayload{}) + topic, _ := MakeTopic(pingTopicName, pingTopicVersion) + + trigger := make(chan *adapters.NodeId) + simnet := simulations.NewNetwork(&simulations.NetworkConfig{ + Id: "0", + Backend: true, + }) + testpeers := make(map[*adapters.NodeId]*pssTestPeer) + nodes := newPssSimulationTester(t, 3, 2, simnet, trigger, vct, protocolName, protocolVersion, testpeers) + ids := []*adapters.NodeId{} // ohh risky! but the action for a specific id should come before the expect anyway + + action = func(ctx context.Context) error { + var thinnodeid *adapters.NodeId + for id, node := range nodes { + ids = append(ids, id) + if _, ok := testpeers[id]; ok { + log.Trace(fmt.Sprintf("adding fullnode %x to testpeers %p", common.ByteLabel(id.Bytes()), testpeers)) + fullnodes = append(fullnodes, id) + node.Pss.Register(topic, node.Pss.GetPingHandler()) + srv := rpc.NewServer() + for _, rpcapi := range node.apifunc() { + srv.RegisterName(rpcapi.Namespace, rpcapi.Service) + } + ws := srv.WebsocketHandler([]string{"*"}) + node.ws = &ws + } else { + thinnodeid = id + } + } + if err := simnet.Connect(fullnodes[0], thinnodeid); err != nil { + return err + } + if err := simnet.Connect(thinnodeid, fullnodes[1]); err != nil { + return err + } + + return nil + } + + check = func(ctx context.Context, id *adapters.NodeId) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + node, ok := nodes[id] + if !ok { + return false, fmt.Errorf("unknown node: %s (%v)", id, node) + } else { + log.Trace(fmt.Sprintf("sim check ok node %v", id)) + } + + return true, nil + } + + timeout = 10 * time.Second + ctx, cancel = context.WithTimeout(context.Background(), timeout) + + result = simulations.NewSimulation(simnet).Run(ctx, &simulations.Step{ + Action: action, + Trigger: trigger, + Expect: &simulations.Expectation{ + Nodes: ids, + Check: check, + }, + }) + if result.Error != nil { + t.Fatalf("simulation failed: %s", result.Error) + } + cancel() + + nonode := &adapters.NodeId{} + firstpssnode = nonode + secondpssnode = nonode + + // first find a node that we're connected to + for firstpssnode == nonode { + log.Debug(fmt.Sprintf("Waiting for pss relaypeer for %x close to %x ...", common.ByteLabel(nodes[fullnodes[0]].OverlayAddr()), common.ByteLabel(nodes[fullnodes[1]].OverlayAddr()))) + nodes[fullnodes[0]].Pss.Overlay.EachLivePeer(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { + for _, id := range ids { + if id.NodeID == p.ID() { + firstpssnode = id + log.Debug(fmt.Sprintf("PSS relay found; relaynode %x", common.ByteLabel(nodes[firstpssnode].OverlayAddr()))) + } + } + if firstpssnode == nonode { + return true + } + return false + }) + if firstpssnode == nonode { + time.Sleep(time.Millisecond * 100) + } + } + + // then find the node it's connected to + for secondpssnode == nonode { + log.Debug(fmt.Sprintf("PSS kademlia: Waiting for recipientpeer for %x close to %x ...", common.ByteLabel(nodes[firstpssnode].OverlayAddr()), common.ByteLabel(nodes[fullnodes[1]].OverlayAddr()))) + nodes[firstpssnode].Pss.Overlay.EachLivePeer(nodes[fullnodes[1]].OverlayAddr(), 256, func(p Peer, po int, isprox bool) bool { + for _, id := range ids { + if id.NodeID == p.ID() && id.NodeID != fullnodes[0].NodeID { + secondpssnode = id + log.Debug(fmt.Sprintf("PSS recipient found; relaynode %x", common.ByteLabel(nodes[secondpssnode].OverlayAddr()))) + } + } + if secondpssnode == nonode { + return true + } + return false + }) + if secondpssnode == nonode { + time.Sleep(time.Millisecond * 100) + } + } + + srvrecvl, err := net.Listen("tcp", srvrecvep) + if err != nil { + t.Fatalf("Tcp (recv) on %s failed: %v", srvrecvep, err) + } + go func() { + err := http.Serve(srvrecvl, *nodes[fullnodes[1]].ws) + if err != nil { + t.Fatalf("http serve (recv) on %s failed: %v", srvrecvep, err) + } + }() + + srvsendl, err := net.Listen("tcp", srvsendep) + if err != nil { + t.Fatalf("Tcp (send) on %s failed: %v", srvsendep, err) + } + go func() { + err := http.Serve(srvsendl, *nodes[fullnodes[0]].ws) + if err != nil { + t.Fatalf("http serve (send) on %s failed: %v", srvrecvep, err) + } + }() + + for !clientrecvok { + log.Trace("attempting clientrecv connect") + clientrecv, err = rpc.DialWebsocket(context.Background(), "ws://"+srvrecvep, "ws://localhost") + if err == nil { + clientrecvok = true + } else { + log.Debug("clientrecv failed, retrying", "error", err) + time.Sleep(time.Millisecond * 250) + } + } + + for !clientsendok { + log.Trace("attempting clientsend connect") + clientsend, err = rpc.DialWebsocket(context.Background(), "ws://"+srvsendep, "ws://localhost") + if err == nil { + clientsendok = true + } else { + log.Debug("clientsend failed, retrying", "error", err) + time.Sleep(time.Millisecond * 250) + } + } + + trigger = make(chan *adapters.NodeId) + ch := make(chan string) + + action = func(ctx context.Context) error { + go func() { + clientrecv.EthSubscribe(ctx, ch, "newMsg", topic) + clientsend.Call(nil, "eth_sendRaw", nodes[secondpssnode].Pss.Overlay.GetAddr().OverlayAddr(), topic, []byte("ping")) + trigger <- secondpssnode + }() + return nil + } + check = func(ctx context.Context, id *adapters.NodeId) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + select { + case msg := <-ch: + log.Trace(fmt.Sprintf("notify!: %v", msg)) + case <-time.NewTimer(time.Second).C: + log.Trace(fmt.Sprintf("no notifies :'(")) + } + // also need to know if the protocolpeer is set up + + return true, nil + } + + timeout = 10 * time.Second + ctx, cancel = context.WithTimeout(context.Background(), timeout) + defer cancel() + + result = simulations.NewSimulation(simnet).Run(ctx, &simulations.Step{ + Action: action, + Trigger: trigger, + Expect: &simulations.Expectation{ + Nodes: []*adapters.NodeId{secondpssnode}, + Check: check, + }, + }) + if result.Error != nil { + t.Fatalf("simulation failed: %s", result.Error) + } + + t.Log("Simulation Passed:") +} + +// test framework below + // numnodes: how many nodes to create // pssnodeidx: on which node indices to start the pss // net: the simulated network @@ -655,36 +933,51 @@ func TestPssFullLinearEcho(t *testing.T) { // version: name for virtual protocol (and pss topic) // testpeers: pss-specific peers, with hook needed for simulation event reporting -func newPssSimulationTester(t *testing.T, numnodes int, numfullnodes int, net *simulations.Network, trigger chan *adapters.NodeId, vct *protocols.CodeMap, name string, version int, testpeers map[*adapters.NodeId]*pssTestPeer) map[*adapters.NodeId]*pssTestNode { +// the simulation tester constructor is currently a hack to fit previous code with later stack using node.Services to start SimNodes + +func newPssSimulationTester(t *testing.T, numnodes int, numfullnodes int, simnet *simulations.Network, trigger chan *adapters.NodeId, vct *protocols.CodeMap, name string, version int, testpeers map[*adapters.NodeId]*pssTestPeer) map[*adapters.NodeId]*pssTestNode { topic, _ := MakeTopic(name, version) nodes := make(map[*adapters.NodeId]*pssTestNode, numnodes) - //svcs := make(map[*adapters.NodeId]*pssTestService, numnodes) psss := make(map[*adapters.NodeId]*Pss) - net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter { + simnet.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter { node := &pssTestNode{ Pss: psss[conf.Id], Hive: nil, SimNode: &adapters.SimNode{}, id: conf.Id, - network: net, + network: simnet, trigger: trigger, ct: vct, + apifunc: func() []rpc.API { return nil }, expectC: make(chan []int), } + + // set up handlers for the encapsulating PssMsg + var handlefunc func(interface{}) error + addr := NewPeerAddrFromNodeId(conf.Id) + if testpeers[conf.Id] != nil { handlefunc = makePssHandleProtocol(psss[conf.Id]) log.Trace(fmt.Sprintf("Making full protocol id %x addr %x (testpeers %p)", common.ByteLabel(conf.Id.Bytes()), common.ByteLabel(addr.OverlayAddr()), testpeers)) } else { handlefunc = makePssHandleForward(psss[conf.Id]) } - //node = newPssTester(t, psss[conf.Id], addr, 0, handlefunc, net, trigger) + + // protocols are now registered by invoking node services + // since adapters.SimNode implements p2p.Server, needed for the services to start, we use this as a convenience wrapper + testservice := newPssTestService(t, handlefunc, node) - nodes[conf.Id] = testservice.node - svc := adapters.NewSimNode(conf.Id, testservice, net) + svc := adapters.NewSimNode(conf.Id, testservice, simnet) testservice.Start(svc) - //svcs[conf.Id] = testservice + + // the network sim wants a adapters.NodeAdapter, so we pass back to it a SimNode + // this is the SimNode member of the testNode initialized above, but assigned through the service start + // that is so say: node == testservice.node, but we access it as a member of testservice below for clarity (to the extent that this can be clear) + + nodes[conf.Id] = testservice.node + testservice.node.apifunc = testservice.APIs return node.SimNode }) configs := make([]*simulations.NodeConfig, numnodes) @@ -708,105 +1001,14 @@ func newPssSimulationTester(t *testing.T, numnodes int, numfullnodes int, net *s psss[conf.Id].Register(topic, pssprotocol.GetHandler()) } - net.NewNodeWithConfig(conf) - if err := net.Start(conf.Id); err != nil { + simnet.NewNodeWithConfig(conf) + if err := simnet.Start(conf.Id); err != nil { t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) } } return nodes - //return svcs -} - -type pssTestService struct { - node *pssTestNode // get addrs from this - msgFunc func(interface{}) error -} - -func newPssTestService(t *testing.T, handlefunc func(interface{}) error, testnode *pssTestNode) *pssTestService { - hp := NewHiveParams() - //hp.CallInterval = 250 - testnode.Hive = NewHive(hp, testnode.Pss.Overlay) - return &pssTestService{ - //nid := adapters.NewNodeId(addr.UnderlayAddr()) - msgFunc: handlefunc, - node: testnode, - } -} - -func (self *pssTestService) Start(server p2p.Server) error { - self.node.SimNode = server.(*adapters.SimNode) // server is adapter.SimnNode now - return nil -} - -func (self *pssTestService) Stop() error { - return nil -} - -func (self *pssTestService) Protocols() []p2p.Protocol { - ct := BzzCodeMap() - for _, m := range DiscoveryMsgs { - ct.Register(m) - } - ct.Register(&PssMsg{}) - //Bzz(addr.OverlayAddr(), addr.UnderlayAddr(), ct, srv, nil, nil).Run - /* - node := &pssTestNode{ - Hive: hive, - Pss: ps, - NodeAdapter: nil, - id: nid, - network: net, - trigger: trigger, - ct: ct, - expectC: make(chan []int), - }*/ - - srv := func(p Peer) error { - p.Register(&PssMsg{}, self.msgFunc) - self.node.Add(p) - p.DisconnectHook(func(err error) { - self.node.Remove(p) - }) - return nil - } - - proto := Bzz(self.node.OverlayAddr(), self.node.UnderlayAddr(), ct, srv, nil, nil) - - return []p2p.Protocol{*proto} -} - -func (self *pssTestService) APIs() []rpc.API { - return nil -} - -/* -func newPssTester(t *testing.T, ps *Pss, addr *peerAddr, numsimnodes int, handlefunc func(interface{}) error, net *simulations.Network, trigger chan *adapters.NodeId) *pssTestNode { - - - - // set up the outer protocol - - - - srv := func(p Peer) error { - p.Register(&PssMsg{}, handlefunc) - node.Add(p) - p.DisconnectHook(func(err error) { - hive.Remove(p) - }) - return nil - } - - node.run = Bzz(addr.OverlayAddr(), addr.UnderlayAddr(), ct, srv, nil, nil).Run - nodeAdapter.Run = node.run - - node.NodeAdapter = adapters.NewSimNode(nid, net) - - - return node } -*/ func makePss(addr []byte) *Pss { kp := NewKadParams() @@ -816,7 +1018,7 @@ func makePss(addr []byte) *Pss { overlay := NewKademlia(addr, kp) ps := NewPss(overlay, pp) - overlay.Prune(time.Tick(time.Millisecond * 250)) + //overlay.Prune(time.Tick(time.Millisecond * 250)) return ps } @@ -827,7 +1029,7 @@ func makeCustomProtocol(name string, version int, ct *protocols.CodeMap, testpee testpeer = &pssTestPeer{} } testpeer.Peer = p - p.Register(&PssTestPayload{}, testpeer.SimpleHandlePssPayload) + p.Register(&pssTestPayload{}, testpeer.SimpleHandlePssPayload) err := p.Run() return err } @@ -836,7 +1038,7 @@ func makeCustomProtocol(name string, version int, ct *protocols.CodeMap, testpee } func makeFakeMsg(ps *Pss, ct *protocols.CodeMap, topic PssTopic, senderaddr PeerAddr, content string) PssMsg { - data := PssTestPayload{} + data := pssTestPayload{} code, found := ct.GetCode(&data) if !found { return PssMsg{} @@ -868,7 +1070,7 @@ func makePssHandleForward(ps *Pss) func(msg interface{}) error { // for the simple check it passes on the message if it's not for us return func(msg interface{}) error { pssmsg := msg.(*PssMsg) - if ps.isSelfRecipient(pssmsg) { + if ps.IsSelfRecipient(pssmsg) { log.Trace("pss for us .. yay!") } else { log.Trace("passing on pss") @@ -882,7 +1084,7 @@ func makePssHandleProtocol(ps *Pss) func(msg interface{}) error { return func(msg interface{}) error { pssmsg := msg.(*PssMsg) - if ps.isSelfRecipient(pssmsg) { + if ps.IsSelfRecipient(pssmsg) { log.Trace("pss for us ... let's process!") env := pssmsg.Payload umsg := env.Payload // this will be rlp encrypted @@ -905,21 +1107,21 @@ func makePssHandleProtocol(ps *Pss) func(msg interface{}) error { // it comes in through // Any pointer receiver that has protocols.Peer func (ptp *pssTestPeer) SimpleHandlePssPayload(msg interface{}) error { - pmsg := msg.(*PssTestPayload) - log.Trace(fmt.Sprintf("PssTestPayloadhandler got message %v", pmsg)) + pmsg := msg.(*pssTestPayload) + log.Trace(fmt.Sprintf("pssTestPayloadhandler got message %v", pmsg)) if pmsg.Data == "ping" { pmsg.Data = "pong" - log.Trace(fmt.Sprintf("PssTestPayloadhandler reply %v", pmsg)) + log.Trace(fmt.Sprintf("pssTestPayloadhandler reply %v", pmsg)) ptp.Send(pmsg) } else if pmsg.Data == "pong" { ptp.successC <- true } else { res, err := strconv.Atoi(pmsg.Data) if err != nil { - log.Trace(fmt.Sprintf("PssTestPayloadhandlererr %v", err)) + log.Trace(fmt.Sprintf("pssTestPayloadhandlererr %v", err)) ptp.successC <- false } else { - log.Trace(fmt.Sprintf("PssTestPayloadhandler sending %d on chan", pmsg)) + log.Trace(fmt.Sprintf("pssTestPayloadhandler sending %d on chan", pmsg)) ptp.successC <- true ptp.resultC <- res } diff --git a/swarm/network/pssapi.go b/swarm/network/pssapi.go new file mode 100644 index 000000000000..82b45e8b03ef --- /dev/null +++ b/swarm/network/pssapi.go @@ -0,0 +1,66 @@ +package network + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +type PssApi struct { + *Pss +} + +func NewPssApi(ps *Pss) *PssApi { + return &PssApi{Pss: ps} +} + +func (self *PssApi) NewMsg(ctx context.Context, topic PssTopic) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, fmt.Errorf("Subscribe not supported") + } + + sub := notifier.CreateSubscription() + + ch := make(chan []byte) + psssub, err := self.Pss.Subscribe(&topic, ch) + if err != nil { + return nil, fmt.Errorf("pss subscription topic %v (rpc sub id %v) failed: %v", topic, sub.ID, err) + } + + go func(topic PssTopic) error { + for { + select { + case msg := <-ch: + if err := notifier.Notify(sub.ID, msg); err != nil { + log.Warn(fmt.Sprintf("notification on pss sub topic %v rpc (sub %v) msg %v failed!", topic, sub.ID, msg)) + return err + } + case err := <-psssub.Err(): + log.Warn(fmt.Sprintf("caught subscription error in pss sub topic: %v", topic, err)) + return err + case <-notifier.Closed(): + log.Warn(fmt.Sprintf("rpc sub notifier closed")) + psssub.Unsubscribe() + return nil + case err := <-sub.Err(): + log.Warn(fmt.Sprintf("rpc sub closed: %v", err)) + psssub.Unsubscribe() + return nil + } + } + return nil + }(topic) + + return sub, nil +} + +func (self *PssApi) SendRaw(to []byte, topic PssTopic, msg []byte) error { + err := self.Pss.Send(to, topic, msg) + if err != nil { + return fmt.Errorf("send error: %v", err) + } + return fmt.Errorf("ok sent") +} diff --git a/swarm/swarm.go b/swarm/swarm.go index 8e77a37f858e..9a2cb3a4728c 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -33,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/adapters" - "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" httpapi "github.com/ethereum/go-ethereum/swarm/api/http" @@ -47,10 +46,8 @@ type Swarm struct { config *api.Config // swarm configuration api *api.Api // high level api layer (fs/manifest) dns api.Resolver // DNS registrar - //dbAccess *network.DbAccess // access to local chunk db iterator and storage counter storage storage.ChunkStore // internal access to storage, common interface to cloud storage backends dpa *storage.DPA // distributed preimage archive, the local API to the storage with document level storage/retrieval support - //depo network.StorageHandler // remote request handler, interface between bzz protocol and the storage cloud storage.CloudStore // procurement, cloud storage backend (can multi-cloud) hive *network.Hive // the logistic manager backend chequebook.Backend // simple blockchain Backend @@ -104,11 +101,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. return } - // setup local store - //glog.V(logger.Debug).Infof("Set up local storage") - - //self.dbAccess = network.NewDbAccess(self.lstore) - glog.V(logger.Debug).Infof("Set up local db access (iterator/counter)") + log.Debug("Set up local db access (iterator/counter)") kp := network.NewKadParams() @@ -122,22 +115,15 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. to, ) log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive")) - - // setup cloud storage backend - //cloud := network.NewForwarder(self.hive) - //glog.V(logger.Debug).Infof("-> set swarm forwarder as cloud storage backend") // setup cloud storage internal access layer self.storage = storage.NewNetStore(hash, self.lstore, nil, config.StoreParams) - glog.V(logger.Debug).Infof("-> swarm net store shared access layer to Swarm Chunk Store") - - // set up Depo (storage handler = cloud storage access layer for incoming remote requests) - // self.depo = network.NewDepo(hash, self.lstore, self.storage) - // glog.V(logger.Debug).Infof("-> REmote Access to CHunks") + log.Debug("-> swarm net store shared access layer to Swarm Chunk Store") // set up DPA, the cloud storage local access layer dpaChunkStore := storage.NewDpaChunkStore(self.lstore, self.storage) log.Debug(fmt.Sprintf("-> Local Access to Swarm")) + // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams) log.Debug(fmt.Sprintf("-> Content Store API")) @@ -156,6 +142,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. log.Debug(fmt.Sprintf("-> Swarm Domain Name Registrar @ address %v", config.EnsRoot.Hex())) self.api = api.NewApi(self.dpa, self.dns) + // Manifests for Smart Hosting log.Debug(fmt.Sprintf("-> Web3 virtual server API")) @@ -177,14 +164,6 @@ Start is called when the stack is started */ // implements the node.Service interface func (self *Swarm) Start(net p2p.Server) error { - connectPeer := func(url string) error { - node, err := discover.ParseNode(url) - if err != nil { - return fmt.Errorf("invalid node URL: %v", err) - } - net.AddPeer(node) - return nil - } // set chequebook if self.swapEnabled { ctx := context.Background() // The initial setup has no deadline. @@ -198,21 +177,25 @@ func (self *Swarm) Start(net p2p.Server) error { } log.Warn(fmt.Sprintf("Starting Swarm service")) - - glog.V(logger.Warn).Infof("Starting Swarm service") + self.hive.Start( - connectPeer, + net, func () <-chan time.Time{ return time.NewTicker(time.Second).C }, ) - log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.Addr())) + log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.GetAddr())) if self.pssEnabled { pssparams := network.NewPssParams() self.pss = network.NewPss(self.hive.Overlay, pssparams) - glog.V(logger.Info).Infof("Pss started: %v", self.pss) + + // for testing purposes, shold be removed in production environment!! + pingtopic, _ := network.MakeTopic("pss", 1) + self.pss.Register(pingtopic, self.pss.GetPingHandler()) + + log.Debug("Pss started: %v", self.pss) } self.dpa.Start() @@ -255,7 +238,6 @@ func (self *Swarm) Stop() error { // implements the node.Service interface func (self *Swarm) Protocols() []p2p.Protocol { - //proto, err := network.Bzz(self.depo, self.backend, self.hive, self.dbAccess, self.config.Swap, self.config.SyncParams, self.config.NetworkId) ct := network.BzzCodeMap() for _, m := range network.DiscoveryMsgs { ct.Register(m) @@ -266,8 +248,26 @@ func (self *Swarm) Protocols() []p2p.Protocol { srv := func(p network.Peer) error { if self.pssEnabled { - //p.Register(&PssMsg{}, self.pssFunc) - glog.V(logger.Warn).Infof("pss is enabled, but handler not yet implemented - it won't work yet, sorry") + p.Register(&network.PssMsg{}, func(msg interface{}) error { + pssmsg := msg.(*network.PssMsg) + + if self.pss.IsSelfRecipient(pssmsg) { + log.Trace("pss for us, yay! ... let's process!") + env := pssmsg.Payload + umsg := env.Payload + f := self.pss.GetHandler(env.Topic) + if f == nil { + return fmt.Errorf("No registered handler for topic '%s'", env.Topic) + } + nid := adapters.NewNodeId(env.SenderUAddr) + p := p2p.NewPeer(nid.NodeID, fmt.Sprintf("%x", common.ByteLabel(nid.Bytes())), []p2p.Cap{}) + return f(umsg, p, env.SenderOAddr) + } else { + log.Trace("pss was for someone else :'( ... forwarding") + return self.pss.Forward(pssmsg) + } + return nil + }) } self.hive.Add(p) p.DisconnectHook(func(err error) { @@ -291,7 +291,8 @@ func (self *Swarm) Protocols() []p2p.Protocol { // implements node.Service // Apis returns the RPC Api descriptors the Swarm implementation offers func (self *Swarm) APIs() []rpc.API { - return []rpc.API{ + + apis := []rpc.API{ // public APIs { Namespace: "bzz", @@ -334,6 +335,17 @@ func (self *Swarm) APIs() []rpc.API { }, // {Namespace, Version, api.NewAdmin(self), false}, } + + if self.pssEnabled { + apis = append(apis, rpc.API{ + Namespace: "eth", + Version: "0.1/pss", + Service: network.NewPssApi(self.pss), + Public: true, + }) + } + + return apis } func (self *Swarm) Api() *api.Api { @@ -348,7 +360,6 @@ func (self *Swarm) SetChequebook(ctx context.Context) error { } log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex())) self.config.Save() - //self.hive.DropAll() return nil }