Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
throttling with Queue
Browse files Browse the repository at this point in the history
Two throttling mechanisms: a limit to lines per second allowed,
and a limit of bytes/second over a recent interval. These settings
are currently in the constructor of the ThrottleThread.
Minor additional update: address issue #185.
  • Loading branch information
AdamISZ committed Dec 16, 2015
1 parent 67043a2 commit 734bdba
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 3 deletions.
74 changes: 71 additions & 3 deletions joinmarket/irc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ssl
import threading
import time
import Queue

from joinmarket.configure import jm_single, get_config_irc_channel
from joinmarket.message_channel import MessageChannel, CJPeerError
Expand Down Expand Up @@ -58,6 +59,60 @@ def get_irc_nick(source):
return source[1:source.find('!')]


class ThrottleThread(threading.Thread):

def __init__(self, irc):
threading.Thread.__init__(self)
self.daemon = True
self.irc = irc
self.msg_buffer = []
#TODO - probably global configuration?
self.MSG_INTERVAL = 0.3
self.B_PER_SEC = 200
self.B_PER_SEC_INTERVAL = 10.0

def run(self):
log.debug("starting throttle thread")
last_msg_time = 0
while not self.irc.give_up:
self.irc.lockthrottle.acquire()
while not (self.irc.throttleQ.empty() and self.irc.obQ.empty()):
#First throttling mechanism: no more than 1 line
#per self.MSG_INTERVAL seconds.
x = time.time() - last_msg_time
if x < self.MSG_INTERVAL:
time.sleep(self.MSG_INTERVAL - x)
#Second throttling mechanism: limited kB/s rate
#over the most recent period.
q = time.time() - self.B_PER_SEC_INTERVAL
#clean out old messages
self.msg_buffer = [_ for _ in self.msg_buffer if _[1] > q]
log.debug("After filter, msg buffer is: "+','.join(
[_[0] for _ in self.msg_buffer]))
bytes_recent = sum(len(i[0]) for i in self.msg_buffer)
if bytes_recent > self.B_PER_SEC * self.B_PER_SEC_INTERVAL:
#should wait some appreciable fraction of B_PER_SEC_INTERVAL
log.debug("Throttling triggered, with: "+str(
bytes_recent)+ " bytes in the last "+str(
self.B_PER_SEC_INTERVAL)+" seconds.")
time.sleep(int(self.B_PER_SEC_INTERVAL/2.0))
try:
throttled_msg = self.irc.throttleQ.get(block=False)
except Queue.Empty:
try:
throttled_msg = self.irc.obQ.get(block=False)
except Queue.Empty:
#this code *should* be unreachable.
continue
pass
self.irc.sock.sendall(throttled_msg+'\r\n')
last_msg_time = time.time()
self.msg_buffer.append((throttled_msg, last_msg_time))
self.irc.lockthrottle.wait()
self.irc.lockthrottle.release()

log.debug("Ended throttling thread.")

class PingThread(threading.Thread):

def __init__(self, irc):
Expand Down Expand Up @@ -206,9 +261,18 @@ def __privmsg(self, nick, cmd, message):
self.send_raw(header + m + trailer)

def send_raw(self, line):
# if not line.startswith('PING LAG'):
# log.debug('sendraw ' + line)
self.sock.sendall(line + '\r\n')
# Messages are queued and prioritised.
# This is an addressing of github #300
if line.startswith("PING"):
self.sock.sendall(line + '\r\n')
else:
if "relorder" in line or "absorder" in line:
self.obQ.put(line)
else:
self.throttleQ.put(line)
self.lockthrottle.acquire()
self.lockthrottle.notify()
self.lockthrottle.release()

def check_for_orders(self, nick, _chunks):
if _chunks[0] in jm_single().ordername_list:
Expand Down Expand Up @@ -511,14 +575,18 @@ def __init__(self,
if password and len(password) == 0:
password = None
self.given_password = password
self.throttleQ = Queue.Queue()
self.obQ = Queue.Queue()

def run(self):
self.waiting = {}
self.built_privmsg = {}
self.give_up = False
self.ping_reply = True
self.lockcond = threading.Condition()
self.lockthrottle = threading.Condition()
PingThread(self).start()
ThrottleThread(self).start()

while not self.give_up:
try:
Expand Down
6 changes: 6 additions & 0 deletions joinmarket/taker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
import sqlite3
import sys
import time
import threading
from decimal import InvalidOperation, Decimal

Expand Down Expand Up @@ -508,6 +509,7 @@ def start_cj(self,
finishcallback=None,
choose_orders_recover=None,
auth_addr=None):
self.cjtx = None
self.cjtx = CoinJoinTX(
self.msgchan, wallet, self.db, cj_amount, orders,
input_utxos, my_cj_addr, my_change_addr,
Expand All @@ -518,6 +520,10 @@ def on_error(self):
pass # TODO implement

def on_pubkey(self, nick, maker_pubkey):
#It's possible that the CoinJoinTX object is
#not yet created (__init__ call not finished).
while not self.cjtx:
time.sleep(0.5)
self.cjtx.start_encryption(nick, maker_pubkey)

def on_ioauth(self, nick, utxo_list, cj_pub, change_addr, btc_sig):
Expand Down

0 comments on commit 734bdba

Please sign in to comment.