From 121c0fec7cdcf1c3ca6873bd09d2294f4f00dc40 Mon Sep 17 00:00:00 2001 From: Adam Gibson Date: Tue, 15 Dec 2015 21:38:15 +0200 Subject: [PATCH] throttling with Queue Two throttling mechanisms: a limit to lines per second allowed, and a limit of bytes/second over a recent interval. These settings are currently in the constructor of the ThrottleThread. Change maker_timeout_seconds default to 60 for large txs. Remove sleep hack in sig sending, no longer needed. Minor update: tumbler test -w wait time reset to 10s. Minor additional update: address issue #185. --- joinmarket/configure.py | 2 +- joinmarket/irc.py | 84 ++++++++++++++++++++++++++++++++++++++--- joinmarket/taker.py | 6 +++ test/tumbler-test.py | 2 +- 4 files changed, 86 insertions(+), 8 deletions(-) diff --git a/joinmarket/configure.py b/joinmarket/configure.py index 8126cb72..6a15088d 100644 --- a/joinmarket/configure.py +++ b/joinmarket/configure.py @@ -77,7 +77,7 @@ def __getitem__(self, key): global_singleton.DUST_THRESHOLD = 2730 global_singleton.bc_interface = None global_singleton.ordername_list = ['absorder', 'relorder'] -global_singleton.maker_timeout_sec = 30 +global_singleton.maker_timeout_sec = 60 global_singleton.debug_file_lock = threading.Lock() global_singleton.debug_file_handle = None global_singleton.core_alert = None diff --git a/joinmarket/irc.py b/joinmarket/irc.py index f7b5253a..15be2446 100644 --- a/joinmarket/irc.py +++ b/joinmarket/irc.py @@ -6,6 +6,7 @@ import ssl import threading import time +import Queue from joinmarket.configure import jm_single, get_config_irc_channel from joinmarket.message_channel import MessageChannel, CJPeerError @@ -58,6 +59,66 @@ def get_irc_nick(source): return source[1:source.find('!')] +class ThrottleThread(threading.Thread): + + def __init__(self, irc): + threading.Thread.__init__(self) + self.daemon = True + self.irc = irc + self.msg_buffer = [] + #TODO - probably global configuration? + self.MSG_INTERVAL = 1.0 + self.B_PER_SEC = 300 + self.B_PER_SEC_INTERVAL = 10.0 + + def run(self): + log.debug("starting throttle thread") + last_msg_time = 0 + while not self.irc.give_up: + self.irc.lockthrottle.acquire() + self.irc.lockthrottle.wait() + self.irc.lockthrottle.release() + while not (self.irc.throttleQ.empty() and self.irc.obQ.empty() + and self.irc.pingQ.empty()): + time.sleep(0.2) #need to avoid cpu spinning if throttled + try: + pingmsg = self.irc.pingQ.get(block=False) + #ping messages are not counted to throttling totals, + #so send immediately + self.irc.sock.sendall(pingmsg + '\r\n') + continue + except Queue.Empty: + pass + #First throttling mechanism: no more than 1 line + #per self.MSG_INTERVAL seconds. + x = time.time() - last_msg_time + if x < self.MSG_INTERVAL: + continue + #Second throttling mechanism: limited kB/s rate + #over the most recent period. + q = time.time() - self.B_PER_SEC_INTERVAL + #clean out old messages + self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q] + bytes_recent = sum(len(i[0]) for i in self.msg_buffer) + if bytes_recent > self.B_PER_SEC * self.B_PER_SEC_INTERVAL: + log.debug("Throttling triggered, with: "+str( + bytes_recent)+ " bytes in the last "+str( + self.B_PER_SEC_INTERVAL)+" seconds.") + continue + try: + throttled_msg = self.irc.throttleQ.get(block=False) + except Queue.Empty: + try: + throttled_msg = self.irc.obQ.get(block=False) + except Queue.Empty: + #this code *should* be unreachable. + continue + self.irc.sock.sendall(throttled_msg+'\r\n') + last_msg_time = time.time() + self.msg_buffer.append((throttled_msg, last_msg_time)) + + log.debug("Ended throttling thread.") + class PingThread(threading.Thread): def __init__(self, irc): @@ -101,7 +162,7 @@ class IRCMessageChannel(MessageChannel): # close implies it will attempt to reconnect def close(self): try: - self.send_raw("QUIT") + self.sock.sendall("QUIT\r\n") except IOError as e: log.debug('errored while trying to quit: ' + repr(e)) @@ -172,8 +233,6 @@ def send_sigs(self, nick, sig_list): # TODO make it send the sigs on one line if there's space for s in sig_list: self.__privmsg(nick, 'sig', s) - time.sleep( - 0.5) # HACK! really there should be rate limiting, see issue#31 def __pubmsg(self, message): log.debug('>>pubmsg ' + message) @@ -206,9 +265,17 @@ def __privmsg(self, nick, cmd, message): self.send_raw(header + m + trailer) def send_raw(self, line): - # if not line.startswith('PING LAG'): - # log.debug('sendraw ' + line) - self.sock.sendall(line + '\r\n') + # Messages are queued and prioritised. + # This is an addressing of github #300 + if line.startswith("PING") or line.startswith("PONG"): + self.pingQ.put(line) + elif "relorder" in line or "absorder" in line: + self.obQ.put(line) + else: + self.throttleQ.put(line) + self.lockthrottle.acquire() + self.lockthrottle.notify() + self.lockthrottle.release() def check_for_orders(self, nick, _chunks): if _chunks[0] in jm_single().ordername_list: @@ -511,6 +578,9 @@ def __init__(self, if password and len(password) == 0: password = None self.given_password = password + self.pingQ = Queue.Queue() + self.throttleQ = Queue.Queue() + self.obQ = Queue.Queue() def run(self): self.waiting = {} @@ -518,7 +588,9 @@ def run(self): self.give_up = False self.ping_reply = True self.lockcond = threading.Condition() + self.lockthrottle = threading.Condition() PingThread(self).start() + ThrottleThread(self).start() while not self.give_up: try: diff --git a/joinmarket/taker.py b/joinmarket/taker.py index cd892ae0..471d6336 100644 --- a/joinmarket/taker.py +++ b/joinmarket/taker.py @@ -6,6 +6,7 @@ import random import sqlite3 import sys +import time import threading from decimal import InvalidOperation, Decimal @@ -508,6 +509,7 @@ def start_cj(self, finishcallback=None, choose_orders_recover=None, auth_addr=None): + self.cjtx = None self.cjtx = CoinJoinTX( self.msgchan, wallet, self.db, cj_amount, orders, input_utxos, my_cj_addr, my_change_addr, @@ -518,6 +520,10 @@ def on_error(self): pass # TODO implement def on_pubkey(self, nick, maker_pubkey): + #It's possible that the CoinJoinTX object is + #not yet created (__init__ call not finished). + while not self.cjtx: + time.sleep(0.5) self.cjtx.start_encryption(nick, maker_pubkey) def on_ioauth(self, nick, utxo_list, cj_pub, change_addr, btc_sig): diff --git a/test/tumbler-test.py b/test/tumbler-test.py index f8ce7b95..3f13a1db 100644 --- a/test/tumbler-test.py +++ b/test/tumbler-test.py @@ -75,7 +75,7 @@ def run_tumble(self, amt): test_in = ['y'] p = pexpect.spawn(python_cmd, ['tumbler.py', '-N', '2', '0', '-a', '0', '-M', - '5', '-w', '3', '-l', '0.2', '-s', '1000000', + '5', '-w', '10', '-l', '0.2', '-s', '1000000', self.wallets[6]['seed'], dest_address]) interact(p, test_in, expected) p.expect(pexpect.EOF, timeout=100000)