Skip to content

Commit

Permalink
coreapi: implement pubsub api
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <[email protected]>
  • Loading branch information
magik6k committed Mar 10, 2018
1 parent 63b108d commit 7dd8ff6
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 4 deletions.
5 changes: 5 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (api *CoreAPI) Pin() coreiface.PinAPI {
return &PinAPI{api, nil}
}

// PubSub returns the PubSubAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return &PubSubAPI{api, nil}
}

// ResolveNode resolves the path `p` using Unixfx resolver, gets and returns the
// resolved Node.
func (api *CoreAPI) ResolveNode(ctx context.Context, p coreiface.Path) (ipld.Node, error) {
Expand Down
3 changes: 3 additions & 0 deletions core/coreapi/interface/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type CoreAPI interface {
// ObjectAPI returns an implementation of Object API
Object() ObjectAPI

// PubSub returns an implementation of PubSub API
PubSub() PubSubAPI

// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (Path, error)

Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/interface/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package iface
import "errors"

var ErrIsDir = errors.New("object is a directory")
var ErrOffline = errors.New("can't resolve, ipfs node is offline")
var ErrOffline = errors.New("this action must be run in online mode, try running 'ipfs daemon' first")
6 changes: 3 additions & 3 deletions core/coreapi/interface/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
type PubSubSubscription interface {
io.Closer

// Chan return incoming message channel
Chan(context.Context) <-chan PubSubMessage
// Next return the next incoming message
Next(context.Context) (PubSubMessage, error)
}

// PubSubMessage is a single PubSub message
Expand Down Expand Up @@ -43,7 +43,7 @@ type PubSubAPI interface {
Publish(context.Context, string, []byte) error

// Subscribe to messages on a given topic
Subscribe(context.Context, string) (PubSubSubscription, error)
Subscribe(context.Context, string, ...options.PubSubSubscribeOption) (PubSubSubscription, error)

// WithDiscover is an option for Subscribe which specifies whether to try to
// discover other peers subscribed to the same topic
Expand Down
108 changes: 108 additions & 0 deletions core/coreapi/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package coreapi

import (
"context"
"errors"

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

floodsub "gx/ipfs/QmSFihvoND3eDaAYRCeLgLPt62yCPgMZs1NSZmKFEtJQQw/go-libp2p-floodsub"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
)

type PubSubAPI struct {
*CoreAPI
*caopts.PubSubOptions
}

type pubSubSubscription struct {
subscription *floodsub.Subscription
}

type pubSubMessage struct {
msg *floodsub.Message
}

func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
if err := api.checkNode(); err != nil {
return nil, err
}

return api.node.Floodsub.GetTopics(), nil
}

func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
if err := api.checkNode(); err != nil {
return nil, err
}

settings, err := caopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}

peers := api.node.Floodsub.ListPeers(settings.Topic)
out := make([]peer.ID, len(peers))

for i, peer := range peers {
out[i] = peer
}

return out, nil
}

func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error {
if err := api.checkNode(); err != nil {
return err
}

return api.node.Floodsub.Publish(topic, data)
}

func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
if err := api.checkNode(); err != nil {
return nil, err
}

sub, err := api.node.Floodsub.Subscribe(topic)
if err != nil {
return nil, err
}

return &pubSubSubscription{sub}, nil
}

func (api *PubSubAPI) checkNode() error {
if !api.node.OnlineMode() {
return coreiface.ErrOffline
}

if api.node.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}

return nil
}

func (sub *pubSubSubscription) Close() error {
sub.subscription.Cancel()
return nil
}

func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) {
msg, err := sub.subscription.Next(ctx)
if err != nil {
return nil, err
}

return &pubSubMessage{msg}, nil
}

func (msg *pubSubMessage) From() peer.ID {
return peer.ID(msg.msg.From)
}

func (msg *pubSubMessage) Data() []byte {
return msg.msg.Data
}

0 comments on commit 7dd8ff6

Please sign in to comment.