Skip to content

Commit

Permalink
Add config option to control JSONRPC request parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
shesek committed May 23, 2024
1 parent 0ac2dbc commit a9be78d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn main() {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal,
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Config {
pub daemon_dir: PathBuf,
pub blocks_dir: PathBuf,
pub daemon_rpc_addr: SocketAddr,
pub daemon_parallelism: usize,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub http_addr: SocketAddr,
Expand Down Expand Up @@ -132,6 +133,12 @@ impl Config {
.help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("daemon_parallelism")
.long("daemon-parallelism")
.help("Number of JSONRPC requests to send in parallel")
.default_value("10")
)
.arg(
Arg::with_name("monitoring_addr")
.long("monitoring-addr")
Expand Down Expand Up @@ -386,6 +393,7 @@ impl Config {
daemon_dir,
blocks_dir,
daemon_rpc_addr,
daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize),
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
Expand Down
19 changes: 15 additions & 4 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl Counter {

pub struct Daemon {
daemon_dir: PathBuf,
daemon_parallelism: usize,
blocks_dir: PathBuf,
network: Network,
conn: Mutex<Connection>,
Expand All @@ -276,13 +277,15 @@ impl Daemon {
daemon_dir: &PathBuf,
blocks_dir: &PathBuf,
daemon_rpc_addr: SocketAddr,
daemon_parallelism: usize,
cookie_getter: Arc<dyn CookieGetter>,
network: Network,
signal: Waiter,
metrics: &Metrics,
) -> Result<Daemon> {
let daemon = Daemon {
daemon_dir: daemon_dir.clone(),
daemon_parallelism,
blocks_dir: blocks_dir.clone(),
network,
conn: Mutex::new(Connection::new(
Expand Down Expand Up @@ -335,6 +338,7 @@ impl Daemon {
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
daemon_parallelism: self.daemon_parallelism,
blocks_dir: self.blocks_dir.clone(),
network: self.network,
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
Expand Down Expand Up @@ -406,10 +410,17 @@ impl Daemon {
fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
// Send in parallel as individual JSONRPC requests, with no batching.
// See https://github.com/Blockstream/electrs/pull/33
params_list
.par_iter()
.map(|params| self.retry_request(method, params))
.collect()
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap();
pool.install(|| {
params_list
.par_iter()
.map(|params| self.retry_request(method, params))
.collect()
})
}

// bitcoind JSONRPC API:
Expand Down
2 changes: 2 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl TestRunner {
network_type,
db_path: electrsdb.path().to_path_buf(),
daemon_dir: daemon_subdir.clone(),
daemon_parallelism: 3,
blocks_dir: daemon_subdir.join("blocks"),
daemon_rpc_addr: params.rpc_socket.into(),
cookie: None,
Expand Down Expand Up @@ -127,6 +128,7 @@ impl TestRunner {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down

0 comments on commit a9be78d

Please sign in to comment.