Skip to content

Commit

Permalink
pubsub cmd: switch to coreapi
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Sep 11, 2018
1 parent 99b57f7 commit d85d79c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 103 deletions.
139 changes: 36 additions & 103 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,13 @@ import (
"io"
"net/http"
"sort"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

cmds "gx/ipfs/QmPTfgFTo9PFr1PvPKyKoeMgBvYPh6cX3aDP7DHKVbnCbi/go-ipfs-cmds"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format"
floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
)

var PubsubCmd = &cmds.Command{
Expand All @@ -43,6 +37,13 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
},
}

type pubsubMessage struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

var PubsubSubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Subscribe to messages on a given topic.",
Expand Down Expand Up @@ -74,44 +75,17 @@ This command outputs data in the following encodings:
cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
return
}

topic := req.Arguments[0]
sub, err := n.Floodsub.Subscribe(topic)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
defer sub.Cancel()

discover, _ := req.Options["discover"].(bool)
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}

sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
defer sub.Close()

if f, ok := res.(http.Flusher); ok {
f.Flush()
Expand All @@ -126,12 +100,17 @@ This command outputs data in the following encodings:
return
}

res.Emit(msg)
res.Emit(&pubsubMessage{
Data: msg.Data(),
From: []byte(msg.From()),
Seqno: msg.Seq(),
TopicIDs: msg.Topics(),
})
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -140,7 +119,7 @@ This command outputs data in the following encodings:
return err
}),
"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -150,7 +129,7 @@ This command outputs data in the following encodings:
return err
}),
"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -163,31 +142,7 @@ This command outputs data in the following encodings:
return err
}),
},
Type: floodsub.Message{},
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
Type: pubsubMessage{},
}

var PubsubPubCmd = &cmds.Command{
Expand All @@ -207,23 +162,12 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

if n.Floodsub == nil {
res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
return
}

topic := req.Arguments[0]

err = req.ParseBodyArgs()
Expand All @@ -233,7 +177,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}

for _, data := range req.Arguments[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
Expand All @@ -254,24 +198,19 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

if n.Floodsub == nil {
res.SetError("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.", cmdkit.ErrNormal)
l, err := api.PubSub().Ls(req.Context)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -311,29 +250,23 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// Must be online!
if !n.OnlineMode() {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use"), cmdkit.ErrNormal)
return
}

var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}

peers := n.Floodsub.ListPeers(topic)
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

list := &stringList{make([]string, 0, len(peers))}

for _, peer := range peers {
Expand Down
4 changes: 4 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package coreapi
import (
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
)

var log = logging.Logger("core/coreapi")

type CoreAPI struct {
node *core.IpfsNode
}
Expand Down
6 changes: 6 additions & 0 deletions core/coreapi/interface/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ type PubSubMessage interface {

// Data returns the message body
Data() []byte

// Seq returns message identifier
Seq() []byte

// Topics returns list of topics this message was set to
Topics() []string
}

// PubSubAPI specifies the interface to PubSub
Expand Down
56 changes: 56 additions & 0 deletions core/coreapi/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package coreapi
import (
"context"
"errors"
"strings"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
)

type PubSubAPI CoreAPI
Expand Down Expand Up @@ -58,6 +64,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
}

func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)

if err := api.checkNode(); err != nil {
return nil, err
}
Expand All @@ -67,9 +75,45 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}

if options.Discover {
go func() {
blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(ctx, api.node, blk.Path().Cid())
}()
}

return &pubSubSubscription{sub}, nil
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
}

func (api *PubSubAPI) checkNode() error {
if !api.node.OnlineMode() {
return coreiface.ErrOffline
Expand Down Expand Up @@ -103,3 +147,15 @@ func (msg *pubSubMessage) From() peer.ID {
func (msg *pubSubMessage) Data() []byte {
return msg.msg.Data
}

func (msg *pubSubMessage) Seq() []byte {
return msg.msg.Seqno
}

func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}

func (api *PubSubAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

0 comments on commit d85d79c

Please sign in to comment.