Skip to content

Commit

Permalink
Merge pull request #5 from Samourai-Wallet/develop_dojo
Browse files Browse the repository at this point in the history
merge last modifs before v0.1.0
  • Loading branch information
kenshin-samourai authored Jan 20, 2020
2 parents 31e7539 + 87d32b7 commit 871ac2e
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Indexer

* Snapshot DB after successful indexing - and run queries on the latest snapshot
* Improve management of reorgs (remove rows related to txs of reorg'd blocks from db)

# Rust

Expand Down
6 changes: 0 additions & 6 deletions config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ type = "usize"
doc = "Number of threads used for bulk indexing (default: use the # of CPUs)"
default = "0"

[[param]]
name = "tx_cache_size_mb"
type = "f32"
doc = "Total size of transactions to cache (MB)"
default = "10.0"

[[param]]
name = "blocktxids_cache_size_mb"
type = "f32"
Expand Down
2 changes: 0 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ pub struct Config {
pub jsonrpc_import: bool,
pub index_batch_size: usize,
pub bulk_index_threads: usize,
pub tx_cache_size: usize,
pub txid_limit: usize,
pub blocktxids_cache_size: usize,
}
Expand Down Expand Up @@ -255,7 +254,6 @@ impl Config {
jsonrpc_import: config.jsonrpc_import,
index_batch_size: config.index_batch_size,
bulk_index_threads: config.bulk_index_threads,
tx_cache_size: (config.tx_cache_size_mb * MB) as usize,
blocktxids_cache_size: (config.blocktxids_cache_size_mb * MB) as usize,
txid_limit: config.txid_limit,
};
Expand Down
51 changes: 36 additions & 15 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bitcoin_hashes::hex::{FromHex, ToHex};
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError;
use serde_json::{from_str, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::SyncSender;
Expand Down Expand Up @@ -219,30 +220,50 @@ impl RPC {
pub fn start(addr: SocketAddr, query: Arc<Query>) -> RPC {
RPC {
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
let handles = Arc::new(Mutex::new(
HashMap::<i32, std::thread::JoinHandle<()>>::new(),
));

let acceptor = RPC::start_acceptor(addr);
let mut children = vec![];
let mut handle_count = 0;

while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone();
let senders = senders.clone();
children.push(spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
}));
let handle_id = handle_count;
handle_count += 1;
// explicitely scope the shadowed variables for the new thread
let handle: thread::JoinHandle<()> = {
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let handles = Arc::clone(&handles);

spawn_thread("peer", move || {
info!("[{}] connected peer #{}", addr, handle_id);
let conn = Connection::new(query, stream, addr);
senders
.lock()
.unwrap()
.insert(handle_id, conn.chan.sender());
conn.run();
info!("[{}] disconnected peer #{}", addr, handle_id);
senders.lock().unwrap().remove(&handle_id);
handles.lock().unwrap().remove(&handle_id);
})
};

handles.lock().unwrap().insert(handle_id, handle);
}

trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().iter() {
for sender in senders.lock().unwrap().values() {
let _ = sender.send(Message::Done);
}

trace!("waiting for {} RPC handling threads", children.len());
for child in children {
let _ = child.join();
trace!("waiting for {} RPC handling threads", handles.lock().unwrap().len());
for (_, handle) in handles.lock().unwrap().drain() {
if let Err(e) = handle.join() {
warn!("failed to join thread: {:?}", e);
}
}

trace!("RPC connections are closed");
Expand Down

0 comments on commit 871ac2e

Please sign in to comment.