Skip to content

Commit

Permalink
Implement responsive mempool sync
Browse files Browse the repository at this point in the history
Previously when generating fee histogram required by Electrum, the server would
use the RPC call `getrawmempool true` which would be very slow during times of
large mempools, and cause the server to be unresponsive.

This commit instead uses `getrawmempool false` and `getmempoolentry` to obtain
all the mempool fees. Because the mempool synchronization is split up over
many different RPC calls, the server can always remain responsive even while
obtaining the mempool. The typical lag will be at most 1 or 2 seconds.

See issue #96
  • Loading branch information
chris-belcher committed Mar 6, 2021
1 parent fcbd3ce commit 136b957
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 52 deletions.
4 changes: 4 additions & 0 deletions config.ini_sample
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ keyfile = certs/cert.key
# This is useful on low powered devices at times when the node mempool is large
disable_mempool_fee_histogram = false

# How often in seconds to update the mempool
# this mempool is used to calculate the mempool fee histogram
mempool_update_interval = 60

# Parameter for broadcasting unconfirmed transactions
# Options are:
# * tor-or-own-node (use tor if tor is running locally, otherwise own-node)
Expand Down
3 changes: 3 additions & 0 deletions electrumpersonalserver/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@
get_block_headers_hex,
DONATION_ADDR,
)
from electrumpersonalserver.server.mempoolhistogram import (
MempoolSync
)
66 changes: 54 additions & 12 deletions electrumpersonalserver/server/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import socket
import time
import datetime
from datetime import datetime
import ssl
import os
import os.path
Expand All @@ -26,12 +26,23 @@
get_block_headers_hex,
DONATION_ADDR,
)
from electrumpersonalserver.server.mempoolhistogram import (
MempoolSync,
PollIntervalChange
)

##python has demented rules for variable scope, so these
## global variables are actually mutable lists
bestblockhash = [None]

def on_heartbeat_listening(txmonitor):
last_heartbeat_listening = [datetime.now()]
last_heartbeat_connected = [datetime.now()]

def on_heartbeat_listening(poll_interval_listening, txmonitor):
if ((datetime.now() - last_heartbeat_listening[0]).total_seconds()
< poll_interval_listening):
return True
last_heartbeat_listening[0] = datetime.now()
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
try:
txmonitor.check_for_updated_txes()
Expand All @@ -40,7 +51,11 @@ def on_heartbeat_listening(txmonitor):
is_node_reachable = False
return is_node_reachable

def on_heartbeat_connected(rpc, txmonitor, protocol):
def on_heartbeat_connected(poll_interval_connected, rpc, txmonitor, protocol):
if ((datetime.now() - last_heartbeat_connected[0]).total_seconds()
< poll_interval_connected):
return
last_heartbeat_connected[0] = datetime.now()
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
is_tip_updated, header = check_for_new_blockchain_tip(rpc,
protocol.are_headers_raw)
Expand Down Expand Up @@ -90,17 +105,26 @@ def run_electrum_server(rpc, txmonitor, config):
logger.debug('using cert: {}, key: {}'.format(certfile, keyfile))
disable_mempool_fee_histogram = config.getboolean("electrum-server",
"disable_mempool_fee_histogram", fallback=False)
mempool_update_interval = int(config.get("bitcoin-rpc",
"mempool_update_interval", fallback=60))
broadcast_method = config.get("electrum-server", "broadcast_method",
fallback="own-node")
tor_host = config.get("electrum-server", "tor_host", fallback="localhost")
tor_port = int(config.get("electrum-server", "tor_port", fallback="9050"))
tor_hostport = (tor_host, tor_port)

mempool_sync = MempoolSync(rpc,
disable_mempool_fee_histogram, mempool_update_interval)
mempool_sync.initial_sync(logger)

protocol = ElectrumProtocol(rpc, txmonitor, logger, broadcast_method,
tor_hostport, disable_mempool_fee_histogram)
tor_hostport, mempool_sync)

normal_listening_timeout = min(poll_interval_listening,
mempool_update_interval)
fast_listening_timeout = 0.5
server_sock = create_server_socket(hostport)
server_sock.settimeout(poll_interval_listening)
server_sock.settimeout(normal_listening_timeout)
accepting_clients = True
while True:
# main server loop, runs forever
Expand All @@ -121,7 +145,14 @@ def run_electrum_server(rpc, txmonitor, config):
certfile=certfile, keyfile=keyfile,
ssl_version=ssl.PROTOCOL_SSLv23)
except socket.timeout:
is_node_reachable = on_heartbeat_listening(txmonitor)
poll_interval_change = mempool_sync.poll_update(1)
if poll_interval_change == PollIntervalChange.FAST_POLLING:
server_sock.settimeout(fast_listening_timeout)
elif poll_interval_change == PollIntervalChange.NORMAL_POLLING:
server_sock.settimeout(normal_listening_timeout)

is_node_reachable = on_heartbeat_listening(
poll_interval_listening, txmonitor)
accepting_clients = is_node_reachable
except (ConnectionRefusedError, ssl.SSLError, IOError):
sock.close()
Expand All @@ -135,7 +166,10 @@ def send_reply_fun(reply):
protocol.set_send_reply_fun(send_reply_fun)

try:
sock.settimeout(poll_interval_connected)
normal_connected_timeout = min(poll_interval_connected,
mempool_update_interval)
fast_connected_timeout = 0.5
sock.settimeout(normal_connected_timeout)
recv_buffer = bytearray()
while True:
# loop for replying to client queries
Expand All @@ -159,7 +193,15 @@ def send_reply_fun(reply):
logger.debug("=> " + line)
protocol.handle_query(query)
except socket.timeout:
on_heartbeat_connected(rpc, txmonitor, protocol)
poll_interval_change = mempool_sync.poll_update(1)
if poll_interval_change == PollIntervalChange.FAST_POLLING:
sock.settimeout(fast_connected_timeout)
elif (poll_interval_change
== PollIntervalChange.NORMAL_POLLING):
sock.settimeout(normal_connected_timeout)

on_heartbeat_connected(poll_interval_connected, rpc,
txmonitor, protocol)
except JsonRpcError as e:
logger.debug("Error with node connection, e = " + repr(e)
+ "\ntraceback = " + str(traceback.format_exc()))
Expand Down Expand Up @@ -454,22 +496,22 @@ def main():

def search_for_block_height_of_date(datestr, rpc):
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
target_time = datetime.datetime.strptime(datestr, "%d/%m/%Y")
target_time = datetime.strptime(datestr, "%d/%m/%Y")
bestblockhash = rpc.call("getbestblockhash", [])
best_head = rpc.call("getblockheader", [bestblockhash])
if target_time > datetime.datetime.fromtimestamp(best_head["time"]):
if target_time > datetime.fromtimestamp(best_head["time"]):
logger.error("date in the future")
return -1
genesis_block = rpc.call("getblockheader", [rpc.call("getblockhash", [0])])
if target_time < datetime.datetime.fromtimestamp(genesis_block["time"]):
if target_time < datetime.fromtimestamp(genesis_block["time"]):
logger.warning("date is before the creation of bitcoin")
return 0
first_height = 0
last_height = best_head["height"]
while True:
m = (first_height + last_height) // 2
m_header = rpc.call("getblockheader", [rpc.call("getblockhash", [m])])
m_header_time = datetime.datetime.fromtimestamp(m_header["time"])
m_header_time = datetime.fromtimestamp(m_header["time"])
m_time_diff = (m_header_time - target_time).total_seconds()
if abs(m_time_diff) < 60*60*2: #2 hours
return m_header["height"]
Expand Down
45 changes: 5 additions & 40 deletions electrumpersonalserver/server/electrumprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,18 @@ class ElectrumProtocol(object):
"""

def __init__(self, rpc, txmonitor, logger, broadcast_method,
tor_hostport, disable_mempool_fee_histogram):
tor_hostport, mempool_sync):
self.rpc = rpc
self.txmonitor = txmonitor
self.logger = logger
self.broadcast_method = broadcast_method
self.tor_hostport = tor_hostport
self.disable_mempool_fee_histogram = disable_mempool_fee_histogram
self.mempool_sync = mempool_sync

self.protocol_version = 0
self.subscribed_to_headers = False
self.are_headers_raw = False
self.txid_blockhash_map = {}
self.printed_slow_mempool_warning = False

def set_send_reply_fun(self, send_reply_fun):
self.send_reply_fun = send_reply_fun
Expand Down Expand Up @@ -379,43 +378,9 @@ def handle_query(self, query):
else:
self._send_error(query["id"], error)
elif method == "mempool.get_fee_histogram":
if self.disable_mempool_fee_histogram:
result = [[0, 0]]
self.logger.debug("fee histogram disabled, sending back empty "
+ "mempool")
else:
st = time.time()
mempool = self.rpc.call("getrawmempool", [True])
et = time.time()
MEMPOOL_WARNING_DURATION = 10 #seconds
if et - st > MEMPOOL_WARNING_DURATION:
if not self.printed_slow_mempool_warning:
self.logger.warning("Mempool very large resulting in"
+ " slow response by server ("
+ str(round(et-st, 1)) + "sec). Consider setting "
+ "`disable_mempool_fee_histogram = true`")
self.printed_slow_mempool_warning = True
#algorithm copied from the relevant place in ElectrumX
#https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
fee_hist = defaultdict(int)
for txid, details in mempool.items():
size = (details["size"] if "size" in details else
details["vsize"])
fee_rate = 1e8*details["fee"] // size
fee_hist[fee_rate] += size
l = list(reversed(sorted(fee_hist.items())))
out = []
size = 0
r = 0
binsize = 100000
for fee, s in l:
size += s
if size + r > binsize:
out.append((fee, size))
r += size - binsize
size = 0
binsize *= 1.1
result = out
result = self.mempool_sync.get_fee_histogram()
self.logger.debug("mempool entry count = "
+ str(len(self.mempool_sync.mempool)))
self._send_response(query, result)
elif method == "blockchain.estimatefee":
estimate = self.rpc.call("estimatesmartfee", [query["params"][0]])
Expand Down
114 changes: 114 additions & 0 deletions electrumpersonalserver/server/mempoolhistogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@

import time
from collections import defaultdict
from datetime import datetime
from enum import Enum

from electrumpersonalserver.server.jsonrpc import JsonRpcError

def calc_histogram(mempool):
#algorithm copied from the relevant place in ElectrumX
#https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
fee_hist = defaultdict(int)
for fee_rate, size in mempool.values():
fee_hist[fee_rate] += size
l = list(reversed(sorted(fee_hist.items())))
out = []
size = 0
r = 0
binsize = 100000
for fee, s in l:
size += s
if size + r > binsize:
out.append((fee, size))
r += size - binsize
size = 0
binsize *= 1.1
return out

class PollIntervalChange(Enum):
UNCHANGED = "unchanged"
FAST_POLLING = "fastpolling"
NORMAL_POLLING = "normalpolling"

class MempoolSync(object):
def __init__(self, rpc, disabled, polling_interval):
self.rpc = rpc
self.disabled = disabled
self.polling_interval = polling_interval
self.mempool = dict()
self.cached_fee_histogram = [[0, 0]]
self.added_txids = None
self.last_poll = None
self.state = "gettxids"

def set_polling_interval(self, polling_interval):
self.polling_interval = polling_interval

def get_fee_histogram(self):
return self.cached_fee_histogram

def initial_sync(self, logger):
if self.disabled:
return
logger.info("Synchronizing mempool . . .")
st = time.time()
for _ in range(2):
self.poll_update(-1)
self.state = "gettxids"
for _ in range(2):
self.poll_update(-1)
#run once for the getrawmempool
#again for the getmempoolentry
#and all that again because the first time will take so long
# that new txes could arrive in that time
et = time.time()
logger.info("Found " + str(len(self.mempool)) + " mempool entries. "
+ "Synchronized mempool in " + str(et - st) + "sec")

#-1 for no timeout
def poll_update(self, timeout):
poll_interval_change = PollIntervalChange.UNCHANGED
if self.disabled:
return poll_interval_change
if self.state == "waiting":
if ((datetime.now() - self.last_poll).total_seconds()
> self.polling_interval):
poll_interval_change = PollIntervalChange.FAST_POLLING
self.state = "gettxids"
elif self.state == "gettxids":
mempool_txids = self.rpc.call("getrawmempool", [])
self.last_poll = datetime.now()
mempool_txids = set(mempool_txids)

removed_txids = set(self.mempool.keys()).difference(mempool_txids)
self.added_txids = iter(mempool_txids.difference(
set(self.mempool.keys())))

for txid in removed_txids:
del self.mempool[txid]

self.state = "getfeerates"
elif self.state == "getfeerates":
if timeout == -1:
timeout = 2**32
start_time = datetime.now()
while self.state != "waiting" and ((datetime.now() - start_time
).total_seconds() < timeout):
try:
txid = next(self.added_txids)
except StopIteration:
self.cached_fee_histogram = calc_histogram(self.mempool)
self.state = "waiting"
poll_interval_change = \
PollIntervalChange.NORMAL_POLLING
self.last_poll = datetime.now()
continue
try:
mempool_tx = self.rpc.call("getmempoolentry", [txid])
except JsonRpcError:
continue
fee_rate = 1e8*mempool_tx["fee"] // mempool_tx["vsize"]
self.mempool[txid] = (fee_rate, mempool_tx["vsize"])

return poll_interval_change

0 comments on commit 136b957

Please sign in to comment.