Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix publishing #250

Merged
merged 24 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libp2p/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc # TODO: why catch and re-raise?
raise exc
except CatchableError as exc:
warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
trace "Exception details", exc = exc.msg
54 changes: 28 additions & 26 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId)

method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
await procCall PubSub(f).handleDisconnect(peer)

method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
## handle peer disconnects
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer.id)

procCall PubSub(f).handleDisconnect(peer)

method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
Expand Down Expand Up @@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub,
trace "calling handler for message", topicId = t,
localPeer = f.peerInfo.id,
fromPeer = msg.fromPeer.pretty
await h(t, msg.data) # trigger user provided handler

try:
await h(t, msg.data) # trigger user provided handler
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg

# forward the message to all peers interested in it
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in toSendPeers:
if p in f.peers and f.peers[p].id != peer.id:
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
let (published, failed) = await f.sendHelper(toSendPeers, m.messages)
for p in failed:
let peer = f.peers.getOrDefault(p)
if not(isNil(peer)):
f.handleDisconnect(peer) # cleanup failed peers

# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
trace "forwared message to peers", peers = published.len

method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand All @@ -111,15 +113,16 @@ method init*(f: FloodSub) =
f.handler = handler
f.codec = FloodSubCodec

method subscribeToPeer*(p: FloodSub,
conn: Connection) {.async.} =
await procCall PubSub(p).subscribeToPeer(conn)
method subscribePeer*(p: FloodSub,
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec)

method publish*(f: FloodSub,
topic: string,
data: seq[byte]) {.async.} =
await procCall PubSub(f).publish(topic, data)
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)

if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
Expand All @@ -131,19 +134,18 @@ method publish*(f: FloodSub,

trace "publishing on topic", name = topic
let msg = Message.init(f.peerInfo, data, topic, f.sign)
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in f.floodsub.getOrDefault(topic):
if p in f.peers:
trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))

# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
for p in failed:
let peer = f.peers.getOrDefault(p)
f.handleDisconnect(peer) # cleanup failed peers

libp2p_pubsub_messages_published.inc(labelValues = [topic])

trace "published message to peers", peers = published.len,
msg = msg.shortLog()
return published.len

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics)
Expand Down
Loading