Skip to content

Commit

Permalink
Move getnodeaddresses RPC calls to main thread
Browse files Browse the repository at this point in the history
This avoids a RPC concurrency issue arising from having
8 threads connect to the node RPC port at once
  • Loading branch information
chris-belcher committed Jun 23, 2019
1 parent fe44aa7 commit dd96954
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
15 changes: 5 additions & 10 deletions electrumpersonalserver/server/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
from ipaddress import ip_network, ip_address
import logging
import tempfile
import threading

from electrumpersonalserver.server.jsonrpc import JsonRpc, JsonRpcError
import electrumpersonalserver.server.hashes as hashes
import electrumpersonalserver.server.merkleproof as merkleproof
import electrumpersonalserver.server.deterministicwallet as deterministicwallet
import electrumpersonalserver.server.transactionmonitor as transactionmonitor
import electrumpersonalserver.server.peertopeer as p2p
import electrumpersonalserver.server.peertopeer as peertopeer

SERVER_VERSION_NUMBER = "0.1.7"

Expand Down Expand Up @@ -247,21 +246,17 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram,
try:
rpc.call("sendrawtransaction", [txhex])
except JsonRpcError as e:
pass
logger.error("Error broadcasting: " + repr(e))
elif broadcast_method == "tor":
TOR_CONNECTIONS = 8
network = "mainnet"
chaininfo = rpc.call("getblockchaininfo", [])
if chaininfo["chain"] == "test":
network = "testnet"
elif chaininfo["chain"] == "regtest":
network = "regtest"
for i in range(TOR_CONNECTIONS):
t = threading.Thread(target=p2p.tor_broadcast_tx,
args=(txhex, tor_hostport, network, rpc, logger),
daemon=True)
t.start()
time.sleep(0.1)
logger.debug("broadcasting to network: " + network)
peertopeer.tor_broadcast_tx(txhex, tor_hostport, network, rpc,
logger)
elif broadcast_method.startswith("system "):
with tempfile.NamedTemporaryFile() as fd:
system_line = broadcast_method[7:].replace("%s", fd.name)
Expand Down
64 changes: 45 additions & 19 deletions electrumpersonalserver/server/peertopeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import socket, time
import base64
import threading
from struct import pack, unpack
from datetime import datetime

Expand All @@ -13,8 +14,6 @@
)
from electrumpersonalserver.server.jsonrpc import JsonRpcError

import logging

PROTOCOL_VERSION = 70012
DEFAULT_USER_AGENT = '/Satoshi:0.18.0/'
NODE_WITNESS = (1 << 3)
Expand Down Expand Up @@ -187,9 +186,9 @@ def run(self):
+ pack('<I', start_height)
+ b'\x01')

self.logger.debug('Connecting to bitcoin peer (magic=' +
hex(self.magic) + ') at ' + str(self.remote_hostport) +
' with proxy ' + str(self.socks5_hostport))
self.logger.debug('Connecting to bitcoin peer at ' +
str(self.remote_hostport) + ' with proxy ' +
str(self.socks5_hostport))
setdefaultproxy(PROXY_TYPE_SOCKS5, self.socks5_hostport[0],
self.socks5_hostport[1], True)
self.sock = socksocket()
Expand Down Expand Up @@ -325,31 +324,23 @@ def handle_message(self, p2p, command, length, payload):
hash_id = payload[ptr[0] : ptr[0] + 32]
ptr[0] += 32
if hash_id == self.txid:
self.logger.info("Uploading tx to " +
str(p2p.remote_hostport))
p2p.sock.sendall(p2p.create_message('tx', self.txhex))
self.uploaded_tx = True
self.logger.info("Uploaded transaction via tor to peer at "
+ str(p2p.remote_hostport))
p2p.close()

def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
ATTEMPTS = 8 # how many times to search for a node that accepts txes
try:
node_addrs = rpc.call("getnodeaddresses", [ATTEMPTS])
except JsonRpcError as e:
logger.debug(repr(e))
logger.error("BitcoinCore v0.18.0 is required to broadcast through Tor")
return False
node_addrs = [a for a in node_addrs if a["services"] & NODE_WITNESS]
for i in range(len(node_addrs)):
remote_hostport = (node_addrs[i]["address"], node_addrs[i]["port"])
def broadcaster_thread(txhex, node_addrs, tor_hostport, network, rpc, logger):
for node_addr in node_addrs:
remote_hostport = (node_addr["address"], node_addr["port"])
p2p_msg_handler = P2PBroadcastTx(txhex, logger)
p2p = P2PProtocol(p2p_msg_handler, remote_hostport=remote_hostport,
network=network, logger=logger, socks5_hostport=tor_hostport,
heartbeat_interval=20)
try:
p2p.run()
except IOError as e:
logger.debug("p2p.run(): " + repr(e))
logger.debug("p2p.run() exited: " + repr(e))
continue
if p2p_msg_handler.uploaded_tx:
break
Expand All @@ -358,3 +349,38 @@ def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
# return false if never found a node that accepted unconfirms
return p2p_msg_handler.uploaded_tx

def chunk_list(d, n):
return [d[x:x + n] for x in range(0, len(d), n)]

def tor_broadcast_tx(txhex, tor_hostport, network, rpc, logger):
CONNECTION_THREADS = 8
CONNECTION_ATTEMPTS_PER_THREAD = 10

required_address_count = CONNECTION_ATTEMPTS_PER_THREAD * CONNECTION_THREADS
node_addrs_witness = []
while True:
try:
new_node_addrs = rpc.call("getnodeaddresses",
[required_address_count])
except JsonRpcError as e:
logger.debug(repr(e))
logger.error("Bitcoin Core v0.18.0 or higher is required "
"to broadcast through Tor")
return False
node_addrs_witness.extend(
[a for a in new_node_addrs if a["services"] & NODE_WITNESS]
)
logger.debug("len(new_node_addrs) = " + str(len(new_node_addrs)) +
" len(node_addrs_witness) = " + str(len(node_addrs_witness)))
if len(node_addrs_witness) > required_address_count:
break
node_addrs_chunks = chunk_list(
node_addrs_witness[:required_address_count],
CONNECTION_ATTEMPTS_PER_THREAD
)
for node_addrs in node_addrs_chunks:
t = threading.Thread(target=broadcaster_thread,
args=(txhex, node_addrs, tor_hostport, network, rpc, logger),
daemon=True)
t.start()

0 comments on commit dd96954

Please sign in to comment.