diff --git a/bzzeth/bzzeth.go b/bzzeth/bzzeth.go new file mode 100644 index 0000000000..27ff93f4a9 --- /dev/null +++ b/bzzeth/bzzeth.go @@ -0,0 +1,121 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "context" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/storage" +) + +// BzzEth implements node.Service +var _ node.Service = &BzzEth{} + +// BzzEth is a global module handling ethereum state on swarm +type BzzEth struct { + peers *peers // bzzeth peer pool + netStore *storage.NetStore // netstore to retrieve and store + kad *network.Kademlia // kademlia to determine if a header chunk belongs to us + quit chan struct{} // quit channel to close go routines +} + +// New constructs the BzzEth node service +func New(ns *storage.NetStore, kad *network.Kademlia) *BzzEth { + return &BzzEth{ + peers: newPeers(), + netStore: ns, + kad: kad, + quit: make(chan struct{}), + } +} + +// Run is the bzzeth protocol run function. +// - creates a peer +// - checks if it is a swarm node, put the protocol in idle mode +// - performs handshake +// - adds peer to the peerpool +// - starts incoming message handler loop +func (b *BzzEth) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := protocols.NewPeer(p, rw, Spec) + bp := NewPeer(peer) + + // perform handshake and register if peer serves headers + handshake, err := bp.Handshake(context.TODO(), Handshake{ServeHeaders: true}, nil) + if err != nil { + return err + } + bp.serveHeaders = handshake.(*Handshake).ServeHeaders + log.Warn("handshake", "hs", handshake, "peer", bp) + // with another swarm node the protocol goes into idle + if isSwarmNodeFunc(bp) { + <-b.quit + return nil + } + b.peers.add(bp) + defer b.peers.remove(bp) + + return peer.Run(b.handleMsg(bp)) +} + +// handleMsg is the message handler that delegates incoming messages +// handlers are called asynchronously so handler calls do not block incoming msg processing +func (b *BzzEth) handleMsg(p *Peer) func(context.Context, interface{}) error { + return func(ctx context.Context, msg interface{}) error { + p.logger.Debug("bzzeth.handleMsg") + switch msg.(type) { + default: + log.Info("Received a message ") + } + return nil + } +} + +// Protocols returns the p2p protocol +func (b *BzzEth) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: Spec.Name, + Version: Spec.Version, + Length: Spec.Length(), + Run: b.Run, + }, + } +} + +// APIs return APIs defined on the node service +func (b *BzzEth) APIs() []rpc.API { + return nil +} + +// Start starts the BzzEth node service +func (b *BzzEth) Start(server *p2p.Server) error { + log.Info("bzzeth starting...") + return nil +} + +// Stop stops the BzzEth node service +func (b *BzzEth) Stop() error { + log.Info("bzzeth shutting down...") + close(b.quit) + return nil +} diff --git a/bzzeth/bzzeth_test.go b/bzzeth/bzzeth_test.go new file mode 100644 index 0000000000..cddcc98d2e --- /dev/null +++ b/bzzeth/bzzeth_test.go @@ -0,0 +1,157 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "errors" + "flag" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + p2ptest "github.com/ethersphere/swarm/p2p/testing" +) + +var ( + loglevel = flag.Int("loglevel", 0, "verbosity of logs") +) + +func init() { + flag.Parse() + + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) +} + +func newBzzEthTester() (*p2ptest.ProtocolTester, *BzzEth, func(), error) { + b := New(nil, nil) + + prvkey, err := crypto.GenerateKey() + if err != nil { + return nil, nil, nil, err + } + + protocolTester := p2ptest.NewProtocolTester(prvkey, 1, b.Run) + teardown := func() { + protocolTester.Stop() + } + + return protocolTester, b, teardown, nil +} + +func handshakeExchange(tester *p2ptest.ProtocolTester, peerID enode.ID, serveHeadersPeer, serveHeadersPivot bool) error { + return tester.TestExchanges( + p2ptest.Exchange{ + Label: "Handshake", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: Handshake{ + ServeHeaders: serveHeadersPeer, + }, + Peer: peerID, + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 0, + Msg: Handshake{ + ServeHeaders: serveHeadersPivot, + }, + Peer: peerID, + }, + }, + }) +} + +// tests handshake between eth node and swarm node +// on successful handshake the protocol does not go idle +// peer added to the pool and serves headers is registered +func TestBzzEthHandshake(t *testing.T) { + tester, b, teardown, err := newBzzEthTester() + if err != nil { + t.Fatal(err) + } + defer teardown() + + node := tester.Nodes[0] + err = handshakeExchange(tester, node.ID(), true, true) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // after successful handshake, expect peer added to peer pool + var p *Peer + for i := 0; i < 10; i++ { + p = b.peers.get(node.ID()) + if p != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + if p == nil { + t.Fatal("bzzeth peer not added") + } + + if !p.serveHeaders { + t.Fatal("bzzeth peer serveHeaders not set") + } + + close(b.quit) + err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("?")}) + if err == nil || err.Error() != "timed out waiting for peers to disconnect" { + t.Fatal(err) + } +} + +// TestBzzBzzHandshake tests that a handshake between two Swarm nodes +func TestBzzBzzHandshake(t *testing.T) { + tester, b, teardown, err := newBzzEthTester() + if err != nil { + t.Fatal(err) + } + defer teardown() + + // redefine isSwarmNodeFunc to force recognise remote peer as swarm node + defer func(f func(*Peer) bool) { + isSwarmNodeFunc = f + }(isSwarmNodeFunc) + isSwarmNodeFunc = func(_ *Peer) bool { return true } + + node := tester.Nodes[0] + err = handshakeExchange(tester, node.ID(), false, true) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + // after handshake expect protocol to hang, peer not added to pool + p := b.peers.get(node.ID()) + if p != nil { + t.Fatal("bzzeth swarm peer incorrectly added") + } + + // after closing the ptotocall, expect disconnect + close(b.quit) + err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("protocol returned")}) + if err != nil { + t.Fatal(err) + } + +} diff --git a/bzzeth/peer.go b/bzzeth/peer.go new file mode 100644 index 0000000000..0deb1b072d --- /dev/null +++ b/bzzeth/peer.go @@ -0,0 +1,154 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import ( + "math/rand" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/p2p/protocols" +) + +// Peer extends p2p/protocols Peer and represents a conrete protocol connection +type Peer struct { + *protocols.Peer // embeds protocols.Peer + serveHeaders bool // if the remote serves headers + requests *requests // per-peer pool of open requests + logger log.Logger // custom logger for peer +} + +// NewPeer is the constructor for Peer +func NewPeer(peer *protocols.Peer) *Peer { + return &Peer{ + Peer: peer, + requests: newRequests(), + logger: log.New("peer", peer.ID()), + } +} + +// peers represents the bzzeth specific peer pool +type peers struct { + mtx sync.RWMutex + peers map[enode.ID]*Peer +} + +func newPeers() *peers { + return &peers{peers: make(map[enode.ID]*Peer)} +} + +func (p *peers) get(id enode.ID) *Peer { + p.mtx.RLock() + defer p.mtx.RUnlock() + return p.peers[id] +} + +func (p *peers) add(peer *Peer) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.peers[peer.ID()] = peer +} + +func (p *peers) remove(peer *Peer) { + p.mtx.Lock() + defer p.mtx.Unlock() + delete(p.peers, peer.ID()) +} + +// getEthPeer finds a peer that serves headers and calls the function argument on this peer +// TODO: implement load balancing of requests in case of multiple peers +func (p *peers) getEth() (peer *Peer) { + p.mtx.RLock() + defer p.mtx.RUnlock() + for _, peer = range p.peers { + if peer.serveHeaders { + break + } + } + return peer +} + +// requests represents the peer specific pool of open requests +type requests struct { + mtx sync.RWMutex // for concurrent access to requests + r map[uint32]*request // requests open for peer +} + +type request struct { + hashes map[string]bool + c chan []byte + cancel func() +} + +// newRequestIDFunc is used to generated unique ID for requests +// tests can reassign for deterministic ids +var newRequestIDFunc = newRequestID + +// newRequestID generates a 32-bit random number to be used as unique id for s +// no reuse of id across peers +func newRequestID() uint32 { + return rand.Uint32() +} + +func newRequests() *requests { + return &requests{ + r: make(map[uint32]*request), + } +} + +// create constructs a new request +// registers it on the peer request pool +// request.cancel() should be called to cleanup +func (r *requests) create(c chan []byte) *request { + req := &request{ + hashes: make(map[string]bool), + c: c, + } + id := newRequestIDFunc() + req.cancel = func() { r.remove(id) } + r.add(id, req) + return req +} + +func (r *requests) add(id uint32, req *request) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.r[id] = req +} + +func (r *requests) remove(id uint32) { + r.mtx.Lock() + defer r.mtx.Unlock() + delete(r.r, id) +} + +func (r *requests) get(id uint32) (*request, bool) { + r.mtx.RLock() + defer r.mtx.RUnlock() + req, ok := r.r[id] + return req, ok +} + +// this function is called to check if the remote peer is another swarm node +// in which case the protocol is idle +// can be reassigned in test to mock a swarm node +var isSwarmNodeFunc = isSwarmNode + +func isSwarmNode(p *Peer) bool { + return p.HasCap("bzz") +} diff --git a/bzzeth/wire.go b/bzzeth/wire.go new file mode 100644 index 0000000000..db91d68bf8 --- /dev/null +++ b/bzzeth/wire.go @@ -0,0 +1,35 @@ +// Copyright 2019 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + +package bzzeth + +import "github.com/ethersphere/swarm/p2p/protocols" + +// Spec is the protocol spec for bzzeth +var Spec = &protocols.Spec{ + Name: "bzzeth", + Version: 1, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + Handshake{}, + }, + DisableContext: true, +} + +// Handshake is used in between the ethereum node and the Swarm node +type Handshake struct { + ServeHeaders bool // indicates if this node is expected to serve requests for headers +} diff --git a/swarm.go b/swarm.go index 881aaece3e..47bafa19bf 100644 --- a/swarm.go +++ b/swarm.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/api" httpapi "github.com/ethersphere/swarm/api/http" + "github.com/ethersphere/swarm/bzzeth" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/contracts/chequebook" "github.com/ethersphere/swarm/contracts/ens" @@ -74,6 +75,7 @@ type Swarm struct { dns api.Resolver // DNS registrar fileStore *storage.FileStore // distributed preimage archive, the local API to the storage with document level storage/retrieval support streamer *stream.Registry + bzzEth *bzzeth.BzzEth bzz *network.Bzz // the logistic manager backend chequebook.Backend // simple blockchain Backend privateKey *ecdsa.PrivateKey @@ -212,6 +214,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e MaxPeerServers: config.MaxStreamPeerServers, } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, self.stateStore, registryOptions, self.swap) + tags := chunk.NewTags() //todo load from state store // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage @@ -222,6 +225,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.bzz = network.NewBzz(bzzconfig, to, self.stateStore, self.streamer.GetSpec(), self.streamer.Run) + self.bzzEth = bzzeth.New(self.netStore, to) + // Pss = postal service over swarm (devp2p over bzz) self.ps, err = pss.New(to, config.Pss) if err != nil { @@ -382,6 +387,11 @@ func (s *Swarm) Start(srv *p2p.Server) error { } log.Info("Swarm network started", "bzzaddr", fmt.Sprintf("%x", s.bzz.Hive.BaseAddr())) + err = s.bzzEth.Start(srv) + if err != nil { + return err + } + if s.ps != nil { s.ps.Start(srv) } @@ -466,7 +476,12 @@ func (s *Swarm) Stop() error { stopCounter.Inc(1) s.streamer.Stop() - err := s.bzz.Stop() + err := s.bzzEth.Stop() + if err != nil { + log.Error("error during bzz-eth shutdown", "err", err) + } + + err = s.bzz.Stop() if s.stateStore != nil { s.stateStore.Close() } @@ -487,7 +502,7 @@ func (s *Swarm) Protocols() (protos []p2p.Protocol) { protos = append(protos, s.bzz.Protocols()...) } else { protos = append(protos, s.bzz.Protocols()...) - + protos = append(protos, s.bzzEth.Protocols()...) if s.ps != nil { protos = append(protos, s.ps.Protocols()...) } @@ -534,7 +549,7 @@ func (s *Swarm) APIs() []rpc.API { } apis = append(apis, s.bzz.APIs()...) - + apis = append(apis, s.bzzEth.APIs()...) apis = append(apis, s.streamer.APIs()...) if s.ps != nil {