Skip to content

Commit

Permalink
Merge pull request #645 from TheCharlatan/tor
Browse files Browse the repository at this point in the history
Add Support for electrum client Tor proxy
  • Loading branch information
zkao authored Aug 11, 2022
2 parents b3effef + 405ffca commit 1ef68fa
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
11 changes: 3 additions & 8 deletions src/farcasterd/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2210,14 +2210,9 @@ pub fn launch(

// Forward tor proxy argument
let parsed = Opts::parse();
match &parsed.shared.tor_proxy {
Some(None) => {
cmd.args(&["-T"]);
}
Some(Some(val)) => {
cmd.args(&["-T", &format!("{}", val)]);
}
_ => (),
info!("tor opts: {:?}", parsed.shared.tor_proxy);
if let Some(t) = &matches.value_of("tor-proxy") {
cmd.args(&["-T", &format!("{}", t)]);
}

// Given specialized args in launch
Expand Down
12 changes: 5 additions & 7 deletions src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub const FARCASTER_DATA_DIR: &str = ".";
pub const FARCASTER_MSG_SOCKET_NAME: &str = "{data_dir}/msg";
pub const FARCASTER_CTL_SOCKET_NAME: &str = "{data_dir}/ctl";

pub const FARCASTER_TOR_PROXY: &str = "127.0.0.1:9050";
pub const FARCASTER_KEY_FILE: &str = "{data_dir}/key.dat";

/// Shared options used by different binaries
Expand All @@ -58,19 +57,18 @@ pub struct Opts {

/// Use Tor
///
/// If set, specifies SOCKS5 proxy used for Tor connectivity and directs
/// all network traffic through Tor network.
/// If the argument is provided in form of flag, without value, uses
/// `127.0.0.1:9050` as default Tor proxy address.
/// If set, specifies SOCKS5 proxy used for Tor connectivity and directs all
/// network traffic through Tor network. On most systems this is
/// 127.0.0.1:9050 by default.
#[clap(
short = 'T',
long,
alias = "tor",
global = true,
env = "FARCASTER_TOR_PROXY",
value_hint = ValueHint::Hostname
value_hint = ValueHint::Hostname,
)]
pub tor_proxy: Option<Option<SocketAddr>>,
pub tor_proxy: Option<SocketAddr>,

/// ZMQ socket name/address to forward all incoming protocol messages
///
Expand Down
101 changes: 75 additions & 26 deletions src/syncerd/bitcoin_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use bitcoin::hashes::{
};
use bitcoin::BlockHash;
use bitcoin::Script;
use electrum_client::Hex32Bytes;
use electrum_client::{raw_client::ElectrumSslStream, HeaderNotification};
use electrum_client::{raw_client::RawClient, GetHistoryRes};
use electrum_client::{Client, ElectrumApi};
use electrum_client::{
raw_client::RawClient, Client, ConfigBuilder, ElectrumApi, GetHistoryRes, HeaderNotification,
Hex32Bytes, Socks5Config,
};
use farcaster_core::bitcoin::segwitv0::signature_hash;
use farcaster_core::bitcoin::transaction::TxInRef;
use farcaster_core::blockchain::{Blockchain, Network};
Expand Down Expand Up @@ -82,9 +82,26 @@ pub struct AddressNotif {
txs: Vec<AddressTx>,
}

fn create_electrum_client(
electrum_server: &str,
proxy_address: Option<String>,
) -> Result<Client, electrum_client::Error> {
if let Some(proxy_address) = proxy_address {
let proxy = Socks5Config::new(proxy_address);
let config = ConfigBuilder::new().socks5(Some(proxy)).unwrap().build();
Client::from_config(electrum_server, config)
} else {
Client::new(electrum_server)
}
}

impl ElectrumRpc {
fn new(electrum_server: &str, polling: bool) -> Result<Self, electrum_client::Error> {
let client = Client::new(electrum_server)?;
fn new(
electrum_server: &str,
polling: bool,
proxy_address: Option<String>,
) -> Result<Self, electrum_client::Error> {
let client = create_electrum_client(electrum_server, proxy_address)?;
let header = client.block_headers_subscribe()?;
debug!("New ElectrumRpc at height {:?}", header.height);

Expand Down Expand Up @@ -637,11 +654,12 @@ async fn run_syncerd_task_receiver(
fn address_polling(
state: Arc<Mutex<SyncerState>>,
electrum_server: String,
proxy_address: Option<String>,
polling: bool,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
loop {
let mut rpc = match ElectrumRpc::new(&electrum_server, polling) {
let mut rpc = match ElectrumRpc::new(&electrum_server, polling, proxy_address.clone()) {
Ok(client) => client,
Err(err) => {
error!(
Expand Down Expand Up @@ -710,12 +728,13 @@ fn address_polling(
fn height_polling(
state: Arc<Mutex<SyncerState>>,
electrum_server: String,
proxy_address: Option<String>,
polling: bool,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
// outer loop ensures the polling restarts if there is an error
loop {
let mut rpc = match ElectrumRpc::new(&electrum_server, polling) {
let mut rpc = match ElectrumRpc::new(&electrum_server, polling, proxy_address.clone()) {
Ok(client) => client,
Err(err) => {
error!(
Expand Down Expand Up @@ -775,12 +794,13 @@ fn height_polling(
fn unseen_transaction_polling(
state: Arc<Mutex<SyncerState>>,
electrum_server: String,
proxy_address: Option<String>,
polling: bool,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
// outer loop ensures the polling restarts if there is an error
loop {
let rpc = match ElectrumRpc::new(&electrum_server, polling) {
let rpc = match ElectrumRpc::new(&electrum_server, polling, proxy_address.clone()) {
Ok(client) => client,
Err(err) => {
error!(
Expand Down Expand Up @@ -811,14 +831,17 @@ impl BitcoinSyncer {

fn transaction_broadcasting(
electrum_server: String,
proxy_address: Option<String>,
mut transaction_broadcast_rx: TokioReceiver<(BroadcastTransaction, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((broadcast_transaction, source)) = transaction_broadcast_rx.recv().await {
match Client::new(&electrum_server).and_then(|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&broadcast_transaction.tx.clone())
}) {
match create_electrum_client(&electrum_server, proxy_address.clone()).and_then(
|broadcast_client| {
broadcast_client.transaction_broadcast_raw(&broadcast_transaction.tx.clone())
},
) {
Ok(txid) => {
tx_event
.send(SyncerdBridgeEvent {
Expand Down Expand Up @@ -854,13 +877,14 @@ fn transaction_broadcasting(

fn estimate_fee_polling(
electrum_server: String,
proxy_address: Option<String>,
state: Arc<Mutex<SyncerState>>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let high_priority_confs = 2;
let low_priority_confs = 6;
loop {
if let Ok(client) = Client::new(&electrum_server) {
if let Ok(client) = create_electrum_client(&electrum_server, proxy_address.clone()) {
loop {
match client
.estimate_fee(high_priority_confs) // docs say sat/kB, but its BTC/kvB
Expand Down Expand Up @@ -897,6 +921,7 @@ fn estimate_fee_polling(
fn sweep_polling(
state: Arc<Mutex<SyncerState>>,
electrum_server: String,
proxy_address: Option<String>,
network: bitcoin::Network,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
Expand All @@ -905,7 +930,7 @@ fn sweep_polling(
let sweep_addresses = state_guard.sweep_addresses.clone();
drop(state_guard);
if !sweep_addresses.is_empty() {
match Client::new(&electrum_server) {
match create_electrum_client(&electrum_server, proxy_address.clone()) {
Err(err) => {
error!(
"Failed to create btc sweep electrum client: {}, retrying",
Expand Down Expand Up @@ -950,15 +975,18 @@ fn sweep_polling(

fn transaction_fetcher(
electrum_server: String,
proxy_address: Option<String>,
mut transaction_get_rx: TokioReceiver<(GetTx, ServiceId)>,
tx_event: TokioSender<SyncerdBridgeEvent>,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
while let Some((get_transaction, source)) = transaction_get_rx.recv().await {
match Client::new(&electrum_server).and_then(|transaction_client| {
transaction_client
.transaction_get(&bitcoin::Txid::from_slice(&get_transaction.hash).unwrap())
}) {
match create_electrum_client(&electrum_server, proxy_address.clone()).and_then(
|transaction_client| {
transaction_client
.transaction_get(&bitcoin::Txid::from_slice(&get_transaction.hash).unwrap())
},
) {
Ok(tx) => {
tx_event
.send(SyncerdBridgeEvent {
Expand Down Expand Up @@ -1004,6 +1032,9 @@ impl Synclet for BitcoinSyncer {
polling: bool,
) -> Result<(), Error> {
let btc_network = network.into();
let proxy_address = opts.shared.tor_proxy.map(|address| address.to_string());
info!("using proxy: {:?}", proxy_address);

if let Some(electrum_server) = &opts.electrum_server {
let electrum_server = electrum_server.clone();
std::thread::spawn(move || {
Expand Down Expand Up @@ -1040,35 +1071,53 @@ impl Synclet for BitcoinSyncer {
.await;
run_syncerd_bridge_event_sender(tx, event_rx, syncer_address).await;

let address_handle =
address_polling(Arc::clone(&state), electrum_server.clone(), polling);
let address_handle = address_polling(
Arc::clone(&state),
electrum_server.clone(),
proxy_address.clone(),
polling,
);

let height_handle =
height_polling(Arc::clone(&state), electrum_server.clone(), polling);
let height_handle = height_polling(
Arc::clone(&state),
electrum_server.clone(),
proxy_address.clone(),
polling,
);

let unseen_transaction_handle = unseen_transaction_polling(
Arc::clone(&state),
electrum_server.clone(),
proxy_address.clone(),
polling,
);

let transaction_broadcast_handle = transaction_broadcasting(
electrum_server.clone(),
proxy_address.clone(),
transaction_broadcast_rx,
event_tx.clone(),
);

let transaction_get_handle = transaction_fetcher(
electrum_server.clone(),
proxy_address.clone(),
transaction_get_rx,
event_tx.clone(),
);

let estimate_fee_handle =
estimate_fee_polling(electrum_server.clone(), Arc::clone(&state));
let estimate_fee_handle = estimate_fee_polling(
electrum_server.clone(),
proxy_address.clone(),
Arc::clone(&state),
);

let sweep_handle =
sweep_polling(Arc::clone(&state), electrum_server, btc_network);
let sweep_handle = sweep_polling(
Arc::clone(&state),
electrum_server.clone(),
proxy_address.clone(),
btc_network,
);

let res = tokio::try_join!(
address_handle,
Expand Down

0 comments on commit 1ef68fa

Please sign in to comment.