Skip to content

Commit

Permalink
pubsub timeouts tuning (#295)
Browse files Browse the repository at this point in the history
* add finegrained timeouts to pubsub

* use 10 millis timeout in tests

* finalization

* revert timeouts

* use `atEof` for reads

* adjust timeouts and use atEof for reads

* use atEof for reads

* set isEof flag

* no backoff for pubsub streams

* temp timer increase, make macos finalize

* don't call `subscribePeer` in libp2p anymore

* more traces

* leak tests

* lower timeouts

* handle exceptions in control message

* don't use `cancelAndWait`

* handle exceptions in helpers

* wip

* don't send empty messages

* check for leaks properly

* don't use cancelAndWait

* don't await subscribption sends

* remove subscrivePeer calls from switch

* trying without the hooks again
  • Loading branch information
dryajov authored Aug 3, 2020
1 parent 909d965 commit 9807647
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 87 deletions.
2 changes: 1 addition & 1 deletion libp2p/multistream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
trace "handle: starting multistream handling", handshaked = active
var handshaked = active
try:
while not conn.closed:
while not conn.atEof:
var ms = string.fromBytes(await conn.readLp(1024))
validateSuffix(ms)

Expand Down
2 changes: 1 addition & 1 deletion libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ template withEOFExceptions(body: untyped): untyped =
proc cleanupTimer(s: LPChannel) {.async.} =
## cleanup timers
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
await s.timerTaskFut.cancelAndWait()
s.timerTaskFut.cancel()

proc closeMessage(s: LPChannel) {.async.} =
logScope:
Expand Down
5 changes: 4 additions & 1 deletion libp2p/muxers/mplex/mplex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "stopping mplex main loop", oid = $m.oid
await m.close()

while not m.connection.closed:
while not m.connection.atEof:
trace "waiting for data", oid = $m.oid
let (id, msgType, data) = await m.connection.readMsg()
trace "read message from connection", id = id,
Expand Down Expand Up @@ -169,7 +169,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "pushing data to channel"

if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize,
sending = data.len
raise newLPStreamLimitError()

await channel.pushTo(data)

of MessageType.CloseIn, MessageType.CloseOut:
Expand Down
9 changes: 5 additions & 4 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ method rpcHandler*(f: FloodSub,
trace "exception in message handler", exc = exc.msg

# forward the message to all peers interested in it
let published = await f.publishHelper(toSendPeers, m.messages)
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)

trace "forwared message to peers", peers = published

Expand All @@ -120,9 +120,10 @@ method subscribePeer*(p: FloodSub,

method publish*(f: FloodSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)
discard await procCall PubSub(f).publish(topic, data, timeout)

if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
Expand All @@ -137,7 +138,7 @@ method publish*(f: FloodSub,
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)

# start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg])
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)

libp2p_pubsub_messages_published.inc(labelValues = [topic])

Expand Down
27 changes: 16 additions & 11 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,14 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =

let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic)
if mids.len <= 0:
if not mids.len > 0:
continue

let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))

if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue

let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids))
for peer in allPeers:
if result.len >= GossipSubD:
trace "got gossip peers", peers = result.len
Expand Down Expand Up @@ -422,7 +420,7 @@ method rpcHandler*(g: GossipSub,
trace "exception in message handler", exc = exc.msg

# forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages)
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)

trace "forwared message to peers", peers = published

Expand All @@ -436,9 +434,15 @@ method rpcHandler*(g: GossipSub,
let messages = g.handleIWant(peer, control.iwant)

if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(
RPCMsg(control: some(respControl), messages: messages))
respControl.ihave.len > 0:
try:
info "sending control message", msg = respControl
await peer.send(
RPCMsg(control: some(respControl), messages: messages))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception forwarding control messages", exc = exc.msg

method subscribe*(g: GossipSub,
topic: string,
Expand Down Expand Up @@ -476,9 +480,10 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =

method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog

var peers: HashSet[PubSubPeer]
Expand Down Expand Up @@ -512,7 +517,7 @@ method publish*(g: GossipSub,
if msgId notin g.mcache:
g.mcache.put(msgId, msg)

let published = await g.publishHelper(peers, @[msg])
let published = await g.publishHelper(peers, @[msg], timeout)
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])

Expand Down
45 changes: 11 additions & 34 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])

type
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private

TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.}

Expand All @@ -51,7 +49,7 @@ type
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
Expand All @@ -66,6 +64,7 @@ method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
##
if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id
peer.onConnect.fire() # Make sure all pending sends are unblocked
p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id

Expand Down Expand Up @@ -95,24 +94,7 @@ proc sendSubs*(p: PubSub,
topics: seq[string],
subscribe: bool) {.async.} =
## send subscriptions to remote peer

try:
# wait for a connection before publishing
# this happens when
if not peer.onConnect.isSet:
trace "awaiting send connection"
await peer.onConnect.wait()

await peer.sendSubOpts(topics, subscribe)
except CancelledError as exc:
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()

raise exc
except CatchableError as exc:
trace "unable to send subscriptions", exc = exc.msg
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
asyncCheck peer.sendSubOpts(topics, subscribe)

method subscribeTopic*(p: PubSub,
topic: string,
Expand Down Expand Up @@ -147,7 +129,6 @@ proc getOrCreatePeer(p: PubSub,
p.peers[peer.id] = peer
peer.observers = p.observers

# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)

return peer
Expand Down Expand Up @@ -281,17 +262,19 @@ method subscribe*(p: PubSub,
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)

proc sendHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[SendRes] {.async.} =
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message],
timeout: Duration): Future[int] {.async.} =
# send messages and cleanup failed peers
var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers:
# avoid sending to self
if sendPeer.peerInfo == p.peerInfo:
continue

trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs))))
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))

var published: seq[string]
var failed: seq[string]
Expand All @@ -306,13 +289,6 @@ proc sendHelper*(p: PubSub,
trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id)

return (published, failed)

proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[int] {.async.} =
# send messages and cleanup failed peers
let (published, failed) = await p.sendHelper(sendPeers, msgs)
for f in failed:
let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)):
Expand All @@ -322,7 +298,8 @@ proc publishHelper*(p: PubSub,

method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.base, async.} =
## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
Expand Down
70 changes: 56 additions & 14 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ declareCounter(libp2p_pubsub_received_messages, "number of messages received", l
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

const
DefaultReadTimeout* = 1.minutes
DefaultSendTimeout* = 10.seconds

type
PubSubObserver* = ref object
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -81,9 +85,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop for peer", closed = conn.closed
try:
try:
while not conn.closed:
while not conn.atEof:
trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024)
let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout)
let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache:
Expand Down Expand Up @@ -119,7 +123,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc

proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
proc send*(
p: PubSubPeer,
msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} =
logScope:
peer = p.id
msg = shortLog(msg)
Expand All @@ -132,7 +139,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =

let encoded = encodeRpcMsg(mm)
if encoded.len <= 0:
trace "empty message, skipping"
info "empty message, skipping"
return

logScope:
Expand All @@ -144,43 +151,78 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return

try:
proc sendToRemote() {.async.} =
logScope:
peer = p.id
msg = shortLog(msg)

trace "about to send message"

if not p.onConnect.isSet:
await p.onConnect.wait()

if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer"

await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"

for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])

let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:
sendFut.cancel()

if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
p.onConnect.clear()

raise exc

proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] =
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics

p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
try:
await p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
# the long timeout is mostly for cases where
# the connection is flaky at the beggingin
timeout = 3.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending subscriptions", exc = exc.msg

proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] =
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending graft to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))

proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
try:
await p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending grafts", exc = exc.msg

proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))

try:
await p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending prunes", exc = exc.msg

proc `$`*(p: PubSubPeer): string =
p.id
Expand Down
Loading

0 comments on commit 9807647

Please sign in to comment.