Skip to content

Commit

Permalink
fix publishing (#250)
Browse files Browse the repository at this point in the history
* use var semantics to optimize table access

* wip... lvalues don't work properly sadly...

* big publish refactor, replenish and balance

* fix internal tests

* use g.peers for fanout (todo: don't include flood peers)

* exclude non gossip from fanout

* internal test fixes

* fix flood tests

* fix test's trypublish

* test interop fixes

* make sure to not remove peers from gossip table

* restore old replenishFanout

* cleanups

* Cleanup resources (#246)

* consolidate reading in lpstream

* remove debug echo

* tune log level

* add channel cleanup and cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cancelation handling

* cleanup and cancelation handling

* cancelation handling

* cancelation

* tests

* rename isConnected to connected

* remove testing trace

* comment out debug stacktraces

* explicit raises

* restore trace vs debug in gossip

* improve fanout replenish behavior further

* cleanup stale peers more eaguerly

* synchronize connection cleanup and small refactor

* close client first and call parent second

* disconnect failed peers on publish

* check for publish result

* fix tests

* fix tests

* always call close

Co-authored-by: Giovanni Petrantoni <[email protected]>
  • Loading branch information
dryajov and sinkingsugar authored Jul 8, 2020
1 parent 775cab4 commit a52763c
Show file tree
Hide file tree
Showing 12 changed files with 493 additions and 374 deletions.
2 changes: 1 addition & 1 deletion libp2p/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc # TODO: why catch and re-raise?
raise exc
except CatchableError as exc:
warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
trace "Exception details", exc = exc.msg
54 changes: 28 additions & 26 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub,
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peerId)

method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
await procCall PubSub(f).handleDisconnect(peer)

method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
## handle peer disconnects
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer.id)

procCall PubSub(f).handleDisconnect(peer)

method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
Expand Down Expand Up @@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub,
trace "calling handler for message", topicId = t,
localPeer = f.peerInfo.id,
fromPeer = msg.fromPeer.pretty
await h(t, msg.data) # trigger user provided handler

try:
await h(t, msg.data) # trigger user provided handler
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg

# forward the message to all peers interested in it
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in toSendPeers:
if p in f.peers and f.peers[p].id != peer.id:
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
let (published, failed) = await f.sendHelper(toSendPeers, m.messages)
for p in failed:
let peer = f.peers.getOrDefault(p)
if not(isNil(peer)):
f.handleDisconnect(peer) # cleanup failed peers

# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
trace "forwared message to peers", peers = published.len

method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand All @@ -111,15 +113,16 @@ method init*(f: FloodSub) =
f.handler = handler
f.codec = FloodSubCodec

method subscribeToPeer*(p: FloodSub,
conn: Connection) {.async.} =
await procCall PubSub(p).subscribeToPeer(conn)
method subscribePeer*(p: FloodSub,
conn: Connection) =
procCall PubSub(p).subscribePeer(conn)
asyncCheck p.handleConn(conn, FloodSubCodec)

method publish*(f: FloodSub,
topic: string,
data: seq[byte]) {.async.} =
await procCall PubSub(f).publish(topic, data)
data: seq[byte]): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)

if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
Expand All @@ -131,19 +134,18 @@ method publish*(f: FloodSub,

trace "publishing on topic", name = topic
let msg = Message.init(f.peerInfo, data, topic, f.sign)
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in f.floodsub.getOrDefault(topic):
if p in f.peers:
trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))

# wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
for p in failed:
let peer = f.peers.getOrDefault(p)
f.handleDisconnect(peer) # cleanup failed peers

libp2p_pubsub_messages_published.inc(labelValues = [topic])

trace "published message to peers", peers = published.len,
msg = msg.shortLog()
return published.len

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} =
await procCall PubSub(f).unsubscribe(topics)
Expand Down
Loading

0 comments on commit a52763c

Please sign in to comment.