diff --git a/joinmarket/irc.py b/joinmarket/irc.py index f7b5253a..4a39fc58 100644 --- a/joinmarket/irc.py +++ b/joinmarket/irc.py @@ -6,6 +6,7 @@ import ssl import threading import time +import Queue from joinmarket.configure import jm_single, get_config_irc_channel from joinmarket.message_channel import MessageChannel, CJPeerError @@ -58,6 +59,57 @@ def get_irc_nick(source): return source[1:source.find('!')] +class ThrottleThread(threading.Thread): + + def __init__(self, irc): + threading.Thread.__init__(self) + self.daemon = True + self.irc = irc + self.msg_buffer = [] + #TODO - probably global configuration? + self.MSG_INTERVAL = 0.3 + self.B_PER_SEC = 200 + self.B_PER_SEC_INTERVAL = 10.0 + + def run(self): + log.debug("starting throttle thread") + last_msg_time = 0 + while not self.irc.give_up: + self.irc.lockthrottle.acquire() + while not (self.irc.throttleQ.empty() and self.irc.obQ.empty()): + #First throttling mechanism: no more than 1 line + #per self.MSG_INTERVAL seconds. + x = time.time() - last_msg_time + if x < self.MSG_INTERVAL: + time.sleep(self.MSG_INTERVAL - x) + #Second throttling mechanism: limited kB/s rate + #over the most recent period. + q = time.time() - self.B_PER_SEC_INTERVAL + #clean out old messages + self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q] + bytes_recent = sum(len(i[0]) for i in self.msg_buffer) + if bytes_recent > self.B_PER_SEC * self.B_PER_SEC_INTERVAL: + #should wait some appreciable fraction of B_PER_SEC_INTERVAL + log.debug("Throttling triggered, with: "+str( + bytes_recent)+ " bytes in the last "+str( + self.B_PER_SEC_INTERVAL)+" seconds.") + time.sleep(int(self.B_PER_SEC_INTERVAL/2.0)) + try: + throttled_msg = self.irc.throttleQ.get(block=False) + except Queue.Empty: + try: + throttled_msg = self.irc.obQ.get(block=False) + except Queue.Empty: + #this code *should* be unreachable. + continue + self.irc.sock.sendall(throttled_msg+'\r\n') + last_msg_time = time.time() + self.msg_buffer.append((throttled_msg, last_msg_time)) + self.irc.lockthrottle.wait() + self.irc.lockthrottle.release() + + log.debug("Ended throttling thread.") + class PingThread(threading.Thread): def __init__(self, irc): @@ -101,7 +153,7 @@ class IRCMessageChannel(MessageChannel): # close implies it will attempt to reconnect def close(self): try: - self.send_raw("QUIT") + self.sock.sendall("QUIT\r\n") except IOError as e: log.debug('errored while trying to quit: ' + repr(e)) @@ -206,9 +258,18 @@ def __privmsg(self, nick, cmd, message): self.send_raw(header + m + trailer) def send_raw(self, line): - # if not line.startswith('PING LAG'): - # log.debug('sendraw ' + line) - self.sock.sendall(line + '\r\n') + # Messages are queued and prioritised. + # This is an addressing of github #300 + if line.startswith("PING") or line.startswith("PONG"): + self.sock.sendall(line + '\r\n') + else: + if "relorder" in line or "absorder" in line: + self.obQ.put(line) + else: + self.throttleQ.put(line) + self.lockthrottle.acquire() + self.lockthrottle.notify() + self.lockthrottle.release() def check_for_orders(self, nick, _chunks): if _chunks[0] in jm_single().ordername_list: @@ -511,6 +572,8 @@ def __init__(self, if password and len(password) == 0: password = None self.given_password = password + self.throttleQ = Queue.Queue() + self.obQ = Queue.Queue() def run(self): self.waiting = {} @@ -518,7 +581,9 @@ def run(self): self.give_up = False self.ping_reply = True self.lockcond = threading.Condition() + self.lockthrottle = threading.Condition() PingThread(self).start() + ThrottleThread(self).start() while not self.give_up: try: diff --git a/joinmarket/taker.py b/joinmarket/taker.py index cd892ae0..471d6336 100644 --- a/joinmarket/taker.py +++ b/joinmarket/taker.py @@ -6,6 +6,7 @@ import random import sqlite3 import sys +import time import threading from decimal import InvalidOperation, Decimal @@ -508,6 +509,7 @@ def start_cj(self, finishcallback=None, choose_orders_recover=None, auth_addr=None): + self.cjtx = None self.cjtx = CoinJoinTX( self.msgchan, wallet, self.db, cj_amount, orders, input_utxos, my_cj_addr, my_change_addr, @@ -518,6 +520,10 @@ def on_error(self): pass # TODO implement def on_pubkey(self, nick, maker_pubkey): + #It's possible that the CoinJoinTX object is + #not yet created (__init__ call not finished). + while not self.cjtx: + time.sleep(0.5) self.cjtx.start_encryption(nick, maker_pubkey) def on_ioauth(self, nick, utxo_list, cj_pub, change_addr, btc_sig):