Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
throttling with Queue
Browse files Browse the repository at this point in the history
Two throttling mechanisms: a limit to lines per second allowed,
and a limit of bytes/second over a recent interval. These settings
are currently in the constructor of the ThrottleThread.
Change maker_timeout_seconds default to 60 for large txs.
Remove sleep hack in sig sending, no longer needed.
Minor updates: tumbler test -w wait time reset to 10s, extra
travis bot logs.
Minor additional update: address issue #185.
  • Loading branch information
AdamISZ committed Dec 20, 2015
1 parent 67043a2 commit a33a851
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ script:
- cd test
- python regtest.py
- python wallet-test.py
- cd ../logs
- for x in `ls`; do tail -50 $x; done
branches:
only:
- develop
2 changes: 1 addition & 1 deletion joinmarket/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __getitem__(self, key):
global_singleton.DUST_THRESHOLD = 2730
global_singleton.bc_interface = None
global_singleton.ordername_list = ['absorder', 'relorder']
global_singleton.maker_timeout_sec = 30
global_singleton.maker_timeout_sec = 60
global_singleton.debug_file_lock = threading.Lock()
global_singleton.debug_file_handle = None
global_singleton.core_alert = None
Expand Down
85 changes: 79 additions & 6 deletions joinmarket/irc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ssl
import threading
import time
import Queue

from joinmarket.configure import jm_single, get_config_irc_channel
from joinmarket.message_channel import MessageChannel, CJPeerError
Expand Down Expand Up @@ -58,6 +59,67 @@ def get_irc_nick(source):
return source[1:source.find('!')]


class ThrottleThread(threading.Thread):

def __init__(self, irc):
threading.Thread.__init__(self)
self.daemon = True
self.irc = irc
self.msg_buffer = []
#TODO - probably global configuration?
self.MSG_INTERVAL = 1.0
self.B_PER_SEC = 300
self.B_PER_SEC_INTERVAL = 10.0

def run(self):
log.debug("starting throttle thread")
last_msg_time = 0
while not self.irc.give_up:
self.irc.lockthrottle.acquire()
while not (self.irc.throttleQ.empty() and self.irc.obQ.empty()
and self.irc.pingQ.empty()):
time.sleep(0.2) #need to avoid cpu spinning if throttled
try:
pingmsg = self.irc.pingQ.get(block=False)
#ping messages are not counted to throttling totals,
#so send immediately
log.debug("sending: "+pingmsg)
self.irc.sock.sendall(pingmsg + '\r\n')
continue
except Queue.Empty:
pass
#First throttling mechanism: no more than 1 line
#per self.MSG_INTERVAL seconds.
x = time.time() - last_msg_time
if x < self.MSG_INTERVAL:
continue
#Second throttling mechanism: limited kB/s rate
#over the most recent period.
q = time.time() - self.B_PER_SEC_INTERVAL
#clean out old messages
self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q]
bytes_recent = sum(len(i[0]) for i in self.msg_buffer)
if bytes_recent > self.B_PER_SEC * self.B_PER_SEC_INTERVAL:
log.debug("Throttling triggered, with: "+str(
bytes_recent)+ " bytes in the last "+str(
self.B_PER_SEC_INTERVAL)+" seconds.")
continue
try:
throttled_msg = self.irc.throttleQ.get(block=False)
except Queue.Empty:
try:
throttled_msg = self.irc.obQ.get(block=False)
except Queue.Empty:
#this code *should* be unreachable.
continue
self.irc.sock.sendall(throttled_msg+'\r\n')
last_msg_time = time.time()
self.msg_buffer.append((throttled_msg, last_msg_time))
self.irc.lockthrottle.wait()
self.irc.lockthrottle.release()

log.debug("Ended throttling thread.")

class PingThread(threading.Thread):

def __init__(self, irc):
Expand Down Expand Up @@ -101,7 +163,7 @@ class IRCMessageChannel(MessageChannel):
# close implies it will attempt to reconnect
def close(self):
try:
self.send_raw("QUIT")
self.sock.sendall("QUIT\r\n")
except IOError as e:
log.debug('errored while trying to quit: ' + repr(e))

Expand Down Expand Up @@ -172,8 +234,6 @@ def send_sigs(self, nick, sig_list):
# TODO make it send the sigs on one line if there's space
for s in sig_list:
self.__privmsg(nick, 'sig', s)
time.sleep(
0.5) # HACK! really there should be rate limiting, see issue#31

def __pubmsg(self, message):
log.debug('>>pubmsg ' + message)
Expand Down Expand Up @@ -206,9 +266,17 @@ def __privmsg(self, nick, cmd, message):
self.send_raw(header + m + trailer)

def send_raw(self, line):
# if not line.startswith('PING LAG'):
# log.debug('sendraw ' + line)
self.sock.sendall(line + '\r\n')
# Messages are queued and prioritised.
# This is an addressing of github #300
if line.startswith("PING") or line.startswith("PONG"):
self.pingQ.put(line)
elif "relorder" in line or "absorder" in line:
self.obQ.put(line)
else:
self.throttleQ.put(line)
self.lockthrottle.acquire()
self.lockthrottle.notify()
self.lockthrottle.release()

def check_for_orders(self, nick, _chunks):
if _chunks[0] in jm_single().ordername_list:
Expand Down Expand Up @@ -511,14 +579,19 @@ def __init__(self,
if password and len(password) == 0:
password = None
self.given_password = password
self.pingQ = Queue.Queue()
self.throttleQ = Queue.Queue()
self.obQ = Queue.Queue()

def run(self):
self.waiting = {}
self.built_privmsg = {}
self.give_up = False
self.ping_reply = True
self.lockcond = threading.Condition()
self.lockthrottle = threading.Condition()
PingThread(self).start()
ThrottleThread(self).start()

while not self.give_up:
try:
Expand Down
6 changes: 6 additions & 0 deletions joinmarket/taker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
import sqlite3
import sys
import time
import threading
from decimal import InvalidOperation, Decimal

Expand Down Expand Up @@ -508,6 +509,7 @@ def start_cj(self,
finishcallback=None,
choose_orders_recover=None,
auth_addr=None):
self.cjtx = None
self.cjtx = CoinJoinTX(
self.msgchan, wallet, self.db, cj_amount, orders,
input_utxos, my_cj_addr, my_change_addr,
Expand All @@ -518,6 +520,10 @@ def on_error(self):
pass # TODO implement

def on_pubkey(self, nick, maker_pubkey):
#It's possible that the CoinJoinTX object is
#not yet created (__init__ call not finished).
while not self.cjtx:
time.sleep(0.5)
self.cjtx.start_encryption(nick, maker_pubkey)

def on_ioauth(self, nick, utxo_list, cj_pub, change_addr, btc_sig):
Expand Down
2 changes: 1 addition & 1 deletion test/tumbler-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run_tumble(self, amt):
test_in = ['y']
p = pexpect.spawn(python_cmd,
['tumbler.py', '-N', '2', '0', '-a', '0', '-M',
'5', '-w', '3', '-l', '0.2', '-s', '1000000',
'5', '-w', '10', '-l', '0.2', '-s', '1000000',
self.wallets[6]['seed'], dest_address])
interact(p, test_in, expected)
p.expect(pexpect.EOF, timeout=100000)
Expand Down

0 comments on commit a33a851

Please sign in to comment.