Skip to content

Commit

Permalink
floodsub: add api for pub/sub
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Sep 10, 2016
1 parent e30576d commit 20ff2f8
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 0 deletions.
141 changes: 141 additions & 0 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package commands

import (
"bytes"
"fmt"
"io"
"reflect"

u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"

cmds "github.com/ipfs/go-ipfs/commands"
floodsub "github.com/whyrusleeping/go-floodsub"
)

var PubsubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Only pubsub by the vaguest technical definition",
},
Subcommands: map[string]*cmds.Command{
"pub": PubsubPubCmd,
"sub": PubsubSubCmd,
},
}
var PubsubSubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "subscribe to messages on a given topic",
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}

topic := req.Arguments()[0]
msgs, err := n.Floodsub.Subscribe(topic)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))

ctx := req.Context()
go func() {
defer close(out)
for {
select {
case msg, ok := <-msgs:
if !ok {
return
}
out <- msg
case <-ctx.Done():
n.Floodsub.Unsub(topic)
}
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
fmt.Println(reflect.TypeOf(res.Output()))
return nil, u.ErrCast()
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*floodsub.Message)
if !ok {
return nil, u.ErrCast()
}

return bytes.NewReader(obj.Data), nil
}

return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
},
},
Type: floodsub.Message{},
}
var PubsubPubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Publish a message to a given pubsub sub topic.",
ShortDescription: `pubsub sub pub pub hubbub sub?
pub. Pubsub subsub subpubsub hubbub pub sub grub subhubbub.
Pubsubs sub hubbub sub tub, dub dub dub wub wub wub wub wub. Pubsub.
Pubsub subsub pubsub hubbub dubtub? Wub. Wub. Pubsub.
Nubs, Tubs, Wubs, pub Dubs. Pub subdub pub pub sub nub.
ipfs pubsub pub "pubsub hubbub"
Pub: hubbub pub sub sub tub pub bub.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("data", true, false, "Data to send to david dias. (and only him)").EnableStdin(),
},
Options: []cmds.Option{
cmds.StringOption("topic", "t", "Topic to pubusb to."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}

topic, found, _ := req.Option("topic").String()
if !found {
res.SetError(fmt.Errorf("topic required"), cmds.ErrNormal)
return
}

err = n.Floodsub.Publish(topic, []byte(req.Arguments()[0]))
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
},
}
1 change: 1 addition & 0 deletions core/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ var rootSubcommands = map[string]*cmds.Command{
"stats": StatsCmd,
"swarm": SwarmCmd,
"tar": TarCmd,
"pubsub": PubsubCmd,
"tour": tourCmd,
"file": unixfs.UnixFSCmd,
"update": ExternalBinary(),
Expand Down
5 changes: 5 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

diag "github.com/ipfs/go-ipfs/diagnostics"
floodsub "github.com/whyrusleeping/go-floodsub"
goprocess "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
pstore "gx/ipfs/QmSZi9ygLohBUGyHMqE5N6eToPwqcg7bZQTULeVLFu7Q6d/go-libp2p-peerstore"
Expand Down Expand Up @@ -112,6 +113,8 @@ type IpfsNode struct {
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher

Floodsub *floodsub.PubSub

proc goprocess.Process
ctx context.Context

Expand Down Expand Up @@ -184,6 +187,8 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
go n.Reprovider.ProvideEvery(ctx, interval)
}

n.Floodsub = floodsub.NewFloodSub(peerhost)

// setup local discovery
if do != nil {
service, err := do(n.PeerHost)
Expand Down

0 comments on commit 20ff2f8

Please sign in to comment.