Skip to content

Commit

Permalink
Merge pull request ethereum#77 from ethersphere/swarm-pss-ws
Browse files Browse the repository at this point in the history
PSS events feed and websockets
  • Loading branch information
zelig authored May 1, 2017
2 parents f9dd317 + d0a3976 commit ba3d785
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 293 deletions.
15 changes: 14 additions & 1 deletion cmd/swarm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,15 @@ var (
SwarmUploadMimeType = cli.StringFlag{
Name: "mime",
Usage: "force mime type",
}
PssEnabledFlag = cli.BoolFlag{
Name: "pss",
Usage: "Enable pss (message passing over swarm)",
}
PssPortFlag = cli.IntFlag{
Name: "pssport",
Usage: fmt.Sprintf("Websockets port for pss (default %d)", node.DefaultWSPort),
}
CorsStringFlag = cli.StringFlag{
Name: "corsdomain",
Usage: "Domain on which to send Access-Control-Allow-Origin header (multiple domains can be supplied separated by a ',')",
Expand Down Expand Up @@ -265,6 +270,7 @@ Cleans database of corrupted entries.
SwarmUploadMimeType,
// pss flags
PssEnabledFlag,
PssPortFlag,
}
app.Flags = append(app.Flags, debug.Flags...)
app.Before = func(ctx *cli.Context) error {
Expand Down Expand Up @@ -300,12 +306,19 @@ func version(ctx *cli.Context) error {

func bzzd(ctx *cli.Context) error {
cfg := defaultNodeConfig
if ctx.GlobalIsSet(PssEnabledFlag.Name) {
cfg.WSHost = "127.0.0.1"
cfg.WSModules = []string{"eth","pss"}
cfg.WSOrigins = []string{"*"}
if ctx.GlobalIsSet(PssPortFlag.Name) {
cfg.WSPort = ctx.GlobalInt(PssPortFlag.Name)
}
}
utils.SetNodeConfig(ctx, &cfg)
stack, err := node.New(&cfg)
if err != nil {
utils.Fatalf("can't create node: %v", err)
}

registerBzzService(ctx, stack)
utils.StartNode(stack)

Expand Down
4 changes: 2 additions & 2 deletions p2p/testing/protocolsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type ProtocolSession struct {
TestNodeAdapter
Ids []*adapters.NodeId
Ids []*adapters.NodeId
ignore []uint64
}

Expand Down Expand Up @@ -142,7 +142,7 @@ func (self *ProtocolSession) expect(exp Expect) error {
} else {
log.Warn("expectmsg errormsg parse error?!")
}
}
}
} else {
log.Warn("expectmsg errormsg parse error?!")
break
Expand Down
54 changes: 0 additions & 54 deletions swarm/api/ws/server.go

This file was deleted.

62 changes: 0 additions & 62 deletions swarm/api/ws/server_test.go

This file was deleted.

55 changes: 49 additions & 6 deletions swarm/network/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/adapters"
Expand All @@ -27,6 +28,8 @@ const (
digestLength = 64
digestCapacity = 256
defaultDigestCacheTTL = time.Second
pingTopicName = "pss"
pingTopicVersion = 1
)

var (
Expand Down Expand Up @@ -121,6 +124,7 @@ type Pss struct {
//peerPool map[pot.Address]map[PssTopic]*PssReadWriter // keep track of all virtual p2p.Peers we are currently speaking to
peerPool map[pot.Address]map[PssTopic]p2p.MsgReadWriter // keep track of all virtual p2p.Peers we are currently speaking to
handlers map[PssTopic]func([]byte, *p2p.Peer, []byte) error // topic and version based pss payload handlers
events map[PssTopic]*event.Feed // subscriptions for each topic
fwdcache map[pssDigest]pssCacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg
cachettl time.Duration // how long to keep messages in fwdcache
hasher func(string) storage.Hasher // hasher to digest message to cache
Expand Down Expand Up @@ -148,6 +152,7 @@ func NewPss(k Overlay, params *PssParams) *Pss {
//peerPool: make(map[pot.Address]map[PssTopic]*PssReadWriter, PssPeerCapacity),
peerPool: make(map[pot.Address]map[PssTopic]p2p.MsgReadWriter, PssPeerCapacity),
handlers: make(map[PssTopic]func([]byte, *p2p.Peer, []byte) error),
events: make(map[PssTopic]*event.Feed),
fwdcache: make(map[pssDigest]pssCacheEntry),
cachettl: params.Cachettl,
hasher: storage.MakeHashFunc,
Expand Down Expand Up @@ -210,11 +215,24 @@ func (self *Pss) checkFwdCache(addr []byte, digest pssDigest) bool {
func (self *Pss) Register(topic PssTopic, handler func(msg []byte, p *p2p.Peer, from []byte) error) error {
self.lock.Lock()
defer self.lock.Unlock()
self.handlers[topic] = handler
self.handlers[topic] = func(msg []byte, p *p2p.Peer, from []byte) error {
self.alertSubscribers(&topic, msg)
return handler(msg, p, from)
}
self.registerFeed(topic)
return nil
}

// Retrieves the handler function registered by *Pss.Register()
func (self *Pss) Subscribe(topic *PssTopic, ch chan []byte) (event.Subscription, error) {
_, ok := self.events[*topic]
if !ok {
return nil, fmt.Errorf("No feed registered for topic %v", topic)
}
sub := self.events[*topic].Subscribe(ch)
log.Trace("new pss subscribe", "topic", topic, "sub", sub)
return sub, nil
}

func (self *Pss) GetHandler(topic PssTopic) func([]byte, *p2p.Peer, []byte) error {
self.lock.Lock()
defer self.lock.Unlock()
Expand Down Expand Up @@ -263,6 +281,20 @@ func (self *Pss) isActive(id pot.Address, topic PssTopic) bool {
return true
}

func (self *Pss) registerFeed(topic PssTopic) {
self.events[topic] = &event.Feed{}
}

func (self *Pss) alertSubscribers(topic *PssTopic, msg []byte) error {
feed, ok := self.events[*topic]
if !ok {
return fmt.Errorf("No subscriptions registered for topic %v", topic)
}
numsent := feed.Send(msg)
log.Trace(fmt.Sprintf("pss sent to %d subscribers", numsent))
return nil
}

// Sends a message using pss. The message could be anything at all, and will be handled by whichever handler function is mapped to PssTopic using *Pss.Register()
//
// The to address is a swarm overlay address
Expand All @@ -289,7 +321,7 @@ func (self *Pss) Send(to []byte, topic PssTopic, msg []byte) error {
// Handlers that want to pass on a message should call this directly
func (self *Pss) Forward(msg *PssMsg) error {

if self.isSelfRecipient(msg) {
if self.IsSelfRecipient(msg) {
return errorForwardToSelf
}

Expand Down Expand Up @@ -326,7 +358,7 @@ func (self *Pss) Forward(msg *PssMsg) error {
return true
})
if sent == 0 {
log.Warn("PSS Was not able to send to any peers")
return fmt.Errorf("PSS Was not able to send to any peers")
} else {
self.addFwdCacheExpire(digest)
}
Expand Down Expand Up @@ -436,13 +468,24 @@ func (self *PssProtocol) handle(msg []byte, p *p2p.Peer, senderAddr []byte) erro
return nil
}

func (ps *Pss) isSelfRecipient(msg *PssMsg) bool {
if bytes.Equal(msg.GetRecipient(), ps.Overlay.GetAddr().OverlayAddr()) {
func (self *Pss) IsSelfRecipient(msg *PssMsg) bool {
if bytes.Equal(msg.GetRecipient(), self.Overlay.GetAddr().OverlayAddr()) {
return true
}
return false
}

func (self *Pss) GetPingHandler() func([]byte, *p2p.Peer, []byte) error {
pingtopic, _ := MakeTopic(pingTopicName, pingTopicVersion)
return func(msg []byte, p *p2p.Peer, from []byte) error {
if bytes.Equal([]byte("ping"), msg) {
log.Trace(fmt.Sprintf("swarm pss ping from %x sending pong", common.ByteLabel(from)))
self.Send(from, pingtopic, []byte("pong"))
}
return nil
}
}

// Pre-Whisper placeholder
func makeMsg(code uint64, msg interface{}) ([]byte, error) {

Expand Down
Loading

0 comments on commit ba3d785

Please sign in to comment.