Skip to content

Commit

Permalink
[bitcoind_rpc] Initial work on RPC example
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Jun 19, 2023
1 parent 492b7a4 commit 26cc17f
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/bitcoind_rpc",
"example-crates/example_cli",
"example-crates/example_electrum",
"example-crates/example_rpc",
"example-crates/wallet_electrum",
"example-crates/wallet_esplora",
"example-crates/wallet_esplora_async",
Expand Down
2 changes: 1 addition & 1 deletion crates/bitcoind_rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "bitcoind_rpc"
name = "bdk_bitcoind_rpc"
version = "0.1.0"
edition = "2021"

Expand Down
11 changes: 9 additions & 2 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use bdk_chain::{
local_chain::CheckPoint,
BlockId, ConfirmationHeightAnchor, ConfirmationTimeAnchor, TxGraph,
};
pub use bitcoincore_rpc;
use bitcoincore_rpc::{bitcoincore_rpc_json::GetBlockResult, Client, RpcApi};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -51,6 +52,10 @@ pub fn confirmation_time_anchor(
}

impl BitcoindRpcItem {
pub fn is_mempool(&self) -> bool {
matches!(self, Self::Mempool { .. })
}

pub fn into_update<K, A, F>(self, anchor: F) -> LocalUpdate<K, A>
where
A: Clone + Ord + PartialOrd,
Expand Down Expand Up @@ -145,15 +150,17 @@ impl<'a> BitcoindRpcIter<'a> {
// block is not in the main chain
continue 'cp_loop;
}

// agreement
// next loop
*last_cp = Some(cp);
*last_info = Some(info);
continue 'main_loop;
}

// no point of agreement found
// next loop will emit block @ fallback height
*last_cp = None;
*last_info = None;
}
(Some(last_cp), last_info @ Some(_)) => {
// find next block
Expand Down Expand Up @@ -212,7 +219,7 @@ impl<'a> BitcoindRpcIter<'a> {
}
}
}
(None, Some(_)) => unreachable!(),
(None, Some(info)) => unreachable!("got info with no checkpoint? info={:#?}", info),
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions example-crates/example_electrum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use example_cli::{
};

const DB_MAGIC: &[u8] = b"bdk_example_electrum";
const DB_PATH: &str = ".bdk_electrum_example.db";
// const ASSUME_FINAL_DEPTH: usize = 10;
const DB_PATH: &str = ".bdk_example_electrum.db";

#[derive(Subcommand, Debug, Clone)]
enum ElectrumCommands {
Expand Down
12 changes: 12 additions & 0 deletions example-crates/example_rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "example_rpc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bdk_chain = { path = "../../crates/chain", features = ["serde"] }
bdk_bitcoind_rpc = { path = "../../crates/bitcoind_rpc" }
example_cli = { path = "../example_cli" }
ctrlc = { version = "^2" }
299 changes: 299 additions & 0 deletions example-crates/example_rpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
use std::{
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::sync_channel,
Arc, Mutex,
},
time::{Duration, Instant, SystemTime},
};

use bdk_bitcoind_rpc::{
bitcoincore_rpc::{Auth, Client, RpcApi},
confirmation_time_anchor, BitcoindRpcItem, BitcoindRpcIter,
};
use bdk_chain::{
bitcoin::{Address, Transaction},
indexed_tx_graph::IndexedAdditions,
keychain::{LocalChangeSet, LocalUpdate},
local_chain::LocalChain,
Append, BlockId, ConfirmationTimeAnchor, IndexedTxGraph,
};
use example_cli::{
anyhow,
clap::{self, Args, Subcommand},
CoinSelectionAlgo, Keychain,
};

const DB_MAGIC: &[u8] = b"bdk_example_rpc";
const DB_PATH: &str = ".bdk_example_rpc.db";
const CHANNEL_BOUND: usize = 10;
const LIVE_POLL_DUR_SECS: Duration = Duration::from_secs(15);

type ChangeSet = LocalChangeSet<Keychain, ConfirmationTimeAnchor>;

#[derive(Args, Debug, Clone)]
struct RpcArgs {
/// RPC URL
#[clap(env = "RPC_URL", long, default_value = "127.0.0.1:8332")]
url: String,
/// RPC auth cookie file
#[clap(env = "RPC_COOKIE", long)]
rpc_cookie: Option<PathBuf>,
/// RPC auth username
#[clap(env = "RPC_USER", long)]
rpc_user: Option<String>,
/// RPC auth password
#[clap(env = "RPC_PASS", long)]
rpc_password: Option<String>,
}

impl From<RpcArgs> for Auth {
fn from(args: RpcArgs) -> Self {
match (args.rpc_cookie, args.rpc_user, args.rpc_password) {
(None, None, None) => Self::None,
(Some(path), _, _) => Self::CookieFile(path),
(_, Some(user), Some(pass)) => Self::UserPass(user, pass),
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
}
}
}

#[derive(Subcommand, Debug, Clone)]
enum RpcCommands {
/// Scans blocks via RPC (starting from last point of agreement) and stores/indexes relevant
/// transactions
Scan {
/// Starting block height to fallback to if no point of agreement if found
#[clap(env = "FALLBACK_HEIGHT", long, default_value = "0")]
fallback_height: u32,
/// The unused-scripts lookahead will be kept at this size
#[clap(long, default_value = "10")]
lookahead: u32,
/// Whether to be live!
#[clap(long, default_value = "false")]
live: bool,
#[clap(flatten)]
rpc_args: RpcArgs,
},
/// Create and broadcast a transaction.
Tx {
value: u64,
address: Address,
#[clap(short, default_value = "bnb")]
coin_select: CoinSelectionAlgo,
#[clap(flatten)]
rpc_args: RpcArgs,
},
}

impl RpcCommands {
fn rpc_args(&self) -> &RpcArgs {
match self {
RpcCommands::Scan { rpc_args, .. } => rpc_args,
RpcCommands::Tx { rpc_args, .. } => rpc_args,
}
}
}

fn main() -> anyhow::Result<()> {
let sigterm_flag = start_ctrlc_handler();

let (args, keymap, index, db, init_changeset) =
example_cli::init::<RpcCommands, ChangeSet>(DB_MAGIC, DB_PATH)?;

let graph = Mutex::new({
let mut graph = IndexedTxGraph::new(index);
graph.apply_additions(init_changeset.indexed_additions);
graph
});

let chain = Mutex::new(LocalChain::from_changeset(init_changeset.chain_changeset));

let rpc_cmd = match args.command {
example_cli::Commands::ChainSpecific(rpc_cmd) => rpc_cmd,
general_cmd => {
let res = example_cli::handle_commands(
&graph,
&db,
&chain,
&keymap,
args.network,
|_| Err(anyhow::anyhow!("use `tx` instead")),
general_cmd,
);
db.lock().unwrap().commit()?;
return res;
}
};

let rpc_client = {
let a = rpc_cmd.rpc_args();
Client::new(
&a.url,
match (&a.rpc_cookie, &a.rpc_user, &a.rpc_password) {
(None, None, None) => Auth::None,
(Some(path), _, _) => Auth::CookieFile(path.clone()),
(_, Some(user), Some(pass)) => Auth::UserPass(user.clone(), pass.clone()),
(_, Some(_), None) => panic!("rpc auth: missing rpc_pass"),
(_, None, Some(_)) => panic!("rpc auth: missing rpc_user"),
},
)?
};

match rpc_cmd {
RpcCommands::Scan {
fallback_height,
lookahead,
live,
..
} => {
graph.lock().unwrap().index.set_lookahead_for_all(lookahead);

let (chan, recv) = sync_channel::<(BitcoindRpcItem, u32)>(CHANNEL_BOUND);
let prev_cp = chain.lock().unwrap().tip();

let join_handle = std::thread::spawn(move || -> anyhow::Result<()> {
let mut tip_height = Option::<u32>::None;

for item in BitcoindRpcIter::new(&rpc_client, fallback_height, prev_cp) {
let item = item?;
let is_block = !item.is_mempool();
let is_mempool = item.is_mempool();

if tip_height.is_none() || !is_block {
tip_height = Some(rpc_client.get_block_count()? as u32);
}
chan.send((item, tip_height.expect("must have tip height")))?;

if sigterm_flag.load(Ordering::Acquire) {
return Ok(());
}
if is_mempool {
if !live {
return Ok(());
}
if await_flag(&sigterm_flag, LIVE_POLL_DUR_SECS) {
return Ok(());
}
}
}
unreachable!()
});

let mut start = Instant::now();

for (item, tip_height) in recv {
let is_mempool = item.is_mempool();
let update: LocalUpdate<Keychain, ConfirmationTimeAnchor> =
item.into_update(confirmation_time_anchor);
let current_height = update.tip.height();

let db_changeset = {
let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();

let chain_changeset = chain.apply_update(update.tip)?;

let mut indexed_additions =
IndexedAdditions::<ConfirmationTimeAnchor, _>::default();
let (_, index_additions) = graph.index.reveal_to_target_multi(&update.keychain);
indexed_additions.append(index_additions.into());
indexed_additions.append(graph.prune_and_apply_update(update.graph));

ChangeSet {
indexed_additions,
chain_changeset,
}
};

let mut db = db.lock().unwrap();
db.stage(db_changeset);

// print stuff every 3 seconds
if start.elapsed() >= Duration::from_secs(3) {
start = Instant::now();
let balance = {
let chain = chain.lock().unwrap();
let graph = graph.lock().unwrap();
graph.graph().balance(
&*chain,
chain.tip().map_or(BlockId::default(), |cp| cp.block_id()),
graph.index.outpoints().iter().cloned(),
|(k, _), _| k == &Keychain::Internal,
)
};
println!(
"* scanned_to: {} / {} tip | total: {} sats",
if is_mempool {
"mempool".to_string()
} else {
current_height.to_string()
},
tip_height,
balance.confirmed
+ balance.immature
+ balance.trusted_pending
+ balance.untrusted_pending
);
}
}

db.lock().unwrap().commit()?;
println!("commited to database!");

join_handle
.join()
.expect("failed to join chain source thread")
}
RpcCommands::Tx {
value,
address,
coin_select,
..
} => {
let chain = chain.lock().unwrap();
let broadcast = move |tx: &Transaction| -> anyhow::Result<()> {
rpc_client.send_raw_transaction(tx)?;
Ok(())
};
example_cli::run_send_cmd(
&graph,
&db,
&*chain,
&keymap,
coin_select,
address,
value,
broadcast,
)
}
}
}

fn start_ctrlc_handler() -> Arc<AtomicBool> {
let flag = Arc::new(AtomicBool::new(false));
let cloned_flag = flag.clone();

ctrlc::set_handler(move || cloned_flag.store(true, Ordering::Release));

flag
}

fn await_flag(flag: &AtomicBool, duration: Duration) -> bool {
let start = SystemTime::now();
loop {
if flag.load(Ordering::Acquire) {
return true;
}
if SystemTime::now()
.duration_since(start)
.expect("should succeed")
>= duration
{
return false;
}
std::thread::sleep(Duration::from_secs(1));
}
}

0 comments on commit 26cc17f

Please sign in to comment.