Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
bzzeth: cleanup
Browse files Browse the repository at this point in the history
- remove unused code
- temp remove wrapping in p2p/testing
- fix decoding in protocols pkg
- fix p2p.Protocol fields from Spec
- fix tests to check peer added
- check serveHeaders field set
  • Loading branch information
zelig committed Jul 17, 2019
1 parent c857568 commit 77da2a9
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 170 deletions.
93 changes: 40 additions & 53 deletions bzzeth/bzzeth.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,27 @@ import (
// BzzEth implements node.Service
var _ node.Service = &BzzEth{}

var Spec = &protocols.Spec{
Name: "bzzeth",
Version: 1,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
Handshake{},
NewBlockHeaders{},
GetBlockHeaders{},
BlockHeaders{},
},
}

// BzzEth is a global module handling ethereum state on swarm
type BzzEth struct {
mtx sync.Mutex
peers map[enode.ID]*Peer
spec *protocols.Spec

netStore *storage.NetStore

quit chan struct{}
mtx sync.Mutex // lock for peer pool
peers map[enode.ID]*Peer // peer pool
netStore *storage.NetStore // netstore to retrieve and store
quit chan struct{} // quit channel to close go routines
}

// New constructs the BzzEth node service
func New(ns *storage.NetStore) *BzzEth {
bzzeth := &BzzEth{
return &BzzEth{
peers: make(map[enode.ID]*Peer),
netStore: ns,
quit: make(chan struct{}),
}
}

bzzeth.spec = Spec

return bzzeth
func (b *BzzEth) getPeer(id enode.ID) *Peer {
b.mtx.Lock()
defer b.mtx.Unlock()
return b.peers[id]
}

func (b *BzzEth) addPeer(p *Peer) {
Expand All @@ -78,24 +67,30 @@ func (b *BzzEth) removePeer(p *Peer) {
delete(b.peers, p.ID())
}

// Run is the bzzeth protocol run function.
// - creates a peer
// - checks if it is a swarm node, put the protocol in idle mode
// - performs handshake
// - adds it the peerpool
// - starts handler loop
func (b *BzzEth) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, b.spec)
peer := protocols.NewPeer(p, rw, Spec)
bp := NewPeer(peer)
b.addPeer(bp)

defer b.removePeer(bp)
defer close(bp.quit)

// perform handshake and register if peer serves headers
handshake, err := bp.Handshake(context.TODO(), Handshake{ServeHeaders: true}, nil)
if err != nil {
return err
}
bp.servesHeaders = handshake.(*Handshake).ServeHeaders
bp.serveHeaders = handshake.(*Handshake).ServeHeaders
log.Warn("handshake", "hs", handshake, "peer", bp)
// with another swarm node the protocol goes into idle
if isSwarmNodeFunc(bp) {
// swarm node - do nothing
<-b.quit
return nil
}
b.addPeer(bp)
defer b.removePeer(bp)

return peer.Run(b.HandleMsg(bp))
}
Expand Down Expand Up @@ -128,49 +123,41 @@ func (b *BzzEth) handleBlockHeaders(ctx context.Context, p *Peer, msg *BlockHead
log.Debug("bzzeth.handleBlockHeaders")
}

// Protocols returns the p2p protocol
func (b *BzzEth) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: "bzzeth",
Version: 1,
Length: 10 * 1024 * 1024,
Name: Spec.Name,
Version: Spec.Version,
Length: Spec.Length(),
Run: b.Run,
},
}
}

// APIs return APIs defined on the node service
func (b *BzzEth) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "bzzeth",
Version: "1.0",
Service: NewAPI(b),
Public: false,
},
}
}

// Additional public methods accessible through API for pss
type API struct {
*BzzEth
}

func NewAPI(b *BzzEth) *API {
return &API{BzzEth: b}
return nil
}

// Start starts the BzzEth node service
func (b *BzzEth) Start(server *p2p.Server) error {
log.Info("bzz-eth starting...")
log.Info("bzzeth starting...")
return nil
}

// Stop stops the BzzEth node service
func (b *BzzEth) Stop() error {
log.Info("bzz-eth shutting down...")
log.Info("bzzeth shutting down...")
close(b.quit)
return nil
}

// this function is called to check if the remote peer is another swarm node
// in which case the protocol is idle
// can be reassigned in test to mock a swarm node
var isSwarmNodeFunc = isSwarmNode

func isSwarmNode(p *Peer) (yes bool) {
func isSwarmNode(p *Peer) bool {
return p.HasCap("bzz")
}
88 changes: 29 additions & 59 deletions bzzeth/bzzeth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"flag"
"os"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
)

var (
loglevel = flag.Int("loglevel", 5, "verbosity of logs")
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
)

func init() {
Expand All @@ -38,15 +39,6 @@ func init() {
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
}

/*
test case:
1. swarm-to-swarm connection
2. swarm-to-eth node
*/

func newBzzEthTester() (*p2ptest.ProtocolTester, *BzzEth, func(), error) {
b := New(nil)

Expand All @@ -63,6 +55,9 @@ func newBzzEthTester() (*p2ptest.ProtocolTester, *BzzEth, func(), error) {
return protocolTester, b, teardown, nil
}

// tests handshake between eth node and swarm node
// on successful handshake the protocol does not go idle
// and serves headers is registered
func TestBzzEthHandshake(t *testing.T) {
tester, b, teardown, err := newBzzEthTester()
if err != nil {
Expand All @@ -78,7 +73,7 @@ func TestBzzEthHandshake(t *testing.T) {
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &Handshake{
Msg: Handshake{
ServeHeaders: true,
},
Peer: node.ID(),
Expand All @@ -87,8 +82,8 @@ func TestBzzEthHandshake(t *testing.T) {
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &Handshake{
ServeHeaders: false,
Msg: Handshake{
ServeHeaders: true,
},
Peer: node.ID(),
},
Expand All @@ -98,16 +93,27 @@ func TestBzzEthHandshake(t *testing.T) {
if err != nil {
t.Fatalf("Got %v", err)
}
var p *Peer
for i := 0; i < 10; i++ {
p = b.getPeer(node.ID())
if p != nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if p == nil {
t.Fatal("bzzeth peer not added")
}
if !p.serveHeaders {
t.Fatal("bzzeth peer serveHeaders not set")
}

close(b.quit)

err = tester.TestDisconnected()
if err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatal(err)
}
}

// TestBzzBzzHandshake tests that a handshake between two Swarm nodes
Expand Down Expand Up @@ -141,7 +147,7 @@ func TestBzzBzzHandshake(t *testing.T) {
{
Code: 0,
Msg: &Handshake{
ServeHeaders: false,
ServeHeaders: true,
},
Peer: node.ID(),
},
Expand All @@ -152,52 +158,16 @@ func TestBzzBzzHandshake(t *testing.T) {
t.Fatalf("Got %v", err)
}

p := b.getPeer(node.ID())
if p != nil {
t.Fatal("bzzeth swarm peer incorrectly added")
}

close(b.quit)

err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: errors.New("protocol returned")})
if err != nil {
t.Fatal(err)
}
}

//func TestNodesCanTalk(t *testing.T) {
//nodeCount := 2

//// create a standard sim
//sim := simulation.NewInProc(map[string]simulation.ServiceFunc{
//"bzz-eth": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
//addr := network.NewAddr(ctx.Config.Node())

//o := New(nil)
//cleanup = func() {
//}

//return o, cleanup, nil
//},
//})
//defer sim.Close()

//ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
//defer cancel()
//_, err := sim.AddNodesAndConnectStar(nodeCount)
//if err != nil {
//t.Fatal(err)
//}

////run the simulation
//result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
//log.Info("Simulation running")
//_ = sim.Net.Nodes

////wait until all subscriptions are done
//select {
//case <-ctx.Done():
//return errors.New("Context timed out")
//}

//return nil
//})
//if result.Error != nil {
//t.Fatal(result.Error)
//}
//}
}
18 changes: 3 additions & 15 deletions bzzeth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,16 @@
package bzzeth

import (
"errors"

"github.com/ethersphere/swarm/p2p/protocols"
)

// ErrMaxPeerServers will be returned if peer server limit is reached.
// It will be sent in the SubscribeErrorMsg.
var ErrMaxPeerServers = errors.New("max peer servers")

// Peer is the Peer extension for the streaming protocol
// Peer extends p2p/protocols Peer
type Peer struct {
*protocols.Peer

servesHeaders bool
quit chan struct{}
serveHeaders bool // if the remote serves headers
}

// NewPeer is the constructor for Peer
func NewPeer(peer *protocols.Peer) *Peer {
p := &Peer{
Peer: peer,
quit: make(chan struct{}),
}
return p
return &Peer{Peer: peer}
}
Loading

0 comments on commit 77da2a9

Please sign in to comment.