Skip to content

Commit

Permalink
Wallet and EsploraBlockchain are thread-safe*+
Browse files Browse the repository at this point in the history
*If the user intends to use the `bdk::Wallet` concurrently, the user MUST
wrap `bdk::Wallet` with a `Mutex` of some sort. This is because BDK's
calls to `RefCell::borrow` and `RefCell::borrow_mut` assumed (implicitly
due to `bdk::Wallet` not being `Send` until this commit) that the user
was running `bdk::Wallet` in a single thread. Failure to keep calls to
`bdk::Wallet` mutually exclusive will result in panics when BDK
internally calls `try_lock().unwrap()` from multiple threads at once.

+If the user intends to use BDK using the async interface, the `Mutex`
that wraps `bdk::Wallet` must additionally be async-aware, such as
`futures::lock::Mutex` or `tokio::sync::Mutex`. This is because
`bdk::Wallet::sync` is async, and the lock on the `bdk::Wallet` needs to
be held across `.await`, and therefore the `MutexGuard` must be `Send` +
`Sync`.
  • Loading branch information
MaxFangX committed Jan 25, 2023
1 parent c5b2f5a commit ddc84ca
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 25 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ readme = "README.md"
license = "MIT OR Apache-2.0"

[dependencies]
bdk-macros = "^0.6"
# bdk-macros = "^0.6"
bdk-macros = { path = "macros", version = "^0.6" }
log = "^0.4"
miniscript = { version = "8.0", features = ["serde"] }
bitcoin = { version = "0.29.1", features = ["serde", "base64", "rand"] }
Expand Down Expand Up @@ -111,6 +112,7 @@ electrsd = "0.21"
# Move back to importing from rust-bitcoin once https://github.com/rust-bitcoin/rust-bitcoin/pull/1342 is released
base64 = "^0.13"
assert_matches = "1.5.0"
tokio = "1"

[[example]]
name = "compact_filters_balance"
Expand Down
2 changes: 1 addition & 1 deletion examples/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub(crate) mod tx {
use bdk::{database::BatchDatabase, SignOptions, Wallet};
use bitcoin::{Address, Transaction};

pub fn build_signed_tx<D: BatchDatabase>(
pub fn build_signed_tx<D: BatchDatabase + Send>(
wallet: &Wallet<D>,
recipient_address: &str,
amount: u64,
Expand Down
4 changes: 2 additions & 2 deletions macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn add_async_trait(mut parsed: ItemTrait) -> TokenStream {
#output

#[cfg(any(target_arch = "wasm32", feature = "async-interface"))]
#[async_trait(?Send)]
#[async_trait]
#parsed
};

Expand Down Expand Up @@ -74,7 +74,7 @@ fn add_async_impl_trait(mut parsed: ItemImpl) -> TokenStream {
#output

#[cfg(any(target_arch = "wasm32", feature = "async-interface"))]
#[async_trait(?Send)]
#[async_trait]
#parsed
};

Expand Down
4 changes: 2 additions & 2 deletions src/blockchain/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl GetBlockHash for AnyBlockchain {

#[maybe_async]
impl WalletSync for AnyBlockchain {
fn wallet_sync<D: BatchDatabase>(
fn wallet_sync<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
Expand All @@ -142,7 +142,7 @@ impl WalletSync for AnyBlockchain {
))
}

fn wallet_setup<D: BatchDatabase>(
fn wallet_setup<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
Expand Down
39 changes: 38 additions & 1 deletion src/blockchain/esplora/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl GetBlockHash for EsploraBlockchain {

#[maybe_async]
impl WalletSync for EsploraBlockchain {
fn wallet_setup<D: BatchDatabase>(
fn wallet_setup<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
_progress_update: Box<dyn Progress>,
Expand Down Expand Up @@ -248,3 +248,40 @@ impl ConfigurableBlockchain for EsploraBlockchain {
Ok(blockchain)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use bitcoin::Network;

use crate::database::{AnyDatabase, MemoryDatabase};
use crate::wallet::test::get_test_wpkh;
use crate::SyncOptions;
use super::*;

/// A quick test that doesn't actually do anything, but will throw a compile
/// error if bdk::Wallet or EsploraBlockchain are not thread-safe.
#[tokio::test]
async fn esplora_async_wallet_is_thread_safe() {
tokio::task::spawn(async move {
let descriptors = testutils!(@descriptors (get_test_wpkh()));
let wallet = Wallet::new(
&descriptors.0,
None,
Network::Regtest,
AnyDatabase::Memory(MemoryDatabase::new()),
)
.unwrap();
let wrapped_wallet = Arc::new(tokio::sync::Mutex::new(wallet));

let esplora = EsploraBlockchain::new("localhost:8000", 20);
let sync_options = SyncOptions { progress: None };
let _ = wrapped_wallet
.lock()
.await
.sync(&esplora, sync_options)
.await;
});
}
}
26 changes: 13 additions & 13 deletions src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub trait WalletSync {
/// For types that do not have that distinction, only this method can be implemented, since
/// [`WalletSync::wallet_sync`] defaults to calling this internally if not overridden.
/// Populate the internal database with transactions and UTXOs
fn wallet_setup<D: BatchDatabase>(
fn wallet_setup<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
Expand All @@ -154,7 +154,7 @@ pub trait WalletSync {
/// [`BatchOperations::set_tx`]: crate::database::BatchOperations::set_tx
/// [`BatchOperations::set_utxo`]: crate::database::BatchOperations::set_utxo
/// [`BatchOperations::del_utxo`]: crate::database::BatchOperations::del_utxo
fn wallet_sync<D: BatchDatabase>(
fn wallet_sync<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
Expand Down Expand Up @@ -210,7 +210,7 @@ fn sum_of_balances<B: BlockchainFactory>(blockchain_factory: B, wallets: &[Walle
)]
pub trait BlockchainFactory {
/// The type returned when building a blockchain from this factory
type Inner: Blockchain;
type Inner: Blockchain + Send + Sync;

/// Build a new blockchain for the given descriptor wallet_name
///
Expand All @@ -227,7 +227,7 @@ pub trait BlockchainFactory {
///
/// Internally uses [`wallet_name_from_descriptor`] to derive the name, and then calls
/// [`BlockchainFactory::build`] to create the blockchain instance.
fn build_for_wallet<D: BatchDatabase>(
fn build_for_wallet<D: BatchDatabase + Send + Sync>(
&self,
wallet: &Wallet<D>,
override_skip_blocks: Option<u32>,
Expand All @@ -253,7 +253,7 @@ pub trait BlockchainFactory {
docsrs,
doc(cfg(not(any(target_arch = "wasm32", feature = "async-interface"))))
)]
fn sync_wallet<D: BatchDatabase>(
fn sync_wallet<D: BatchDatabase + Send>(
&self,
wallet: &Wallet<D>,
override_skip_blocks: Option<u32>,
Expand All @@ -264,7 +264,7 @@ pub trait BlockchainFactory {
}
}

impl<T: StatelessBlockchain> BlockchainFactory for Arc<T> {
impl<T: StatelessBlockchain + Send + Sync> BlockchainFactory for Arc<T> {
type Inner = Self;

fn build(&self, _wallet_name: &str, _override_skip_blocks: Option<u32>) -> Result<Self, Error> {
Expand Down Expand Up @@ -338,7 +338,7 @@ impl Progress for LogProgress {
}

#[maybe_async]
impl<T: Blockchain> Blockchain for Arc<T> {
impl<T: Blockchain + Send + Sync> Blockchain for Arc<T> {
fn get_capabilities(&self) -> HashSet<Capability> {
maybe_await!(self.deref().get_capabilities())
}
Expand All @@ -353,37 +353,37 @@ impl<T: Blockchain> Blockchain for Arc<T> {
}

#[maybe_async]
impl<T: GetTx> GetTx for Arc<T> {
impl<T: GetTx + Send + Sync> GetTx for Arc<T> {
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
maybe_await!(self.deref().get_tx(txid))
}
}

#[maybe_async]
impl<T: GetHeight> GetHeight for Arc<T> {
impl<T: GetHeight + Send + Sync> GetHeight for Arc<T> {
fn get_height(&self) -> Result<u32, Error> {
maybe_await!(self.deref().get_height())
}
}

#[maybe_async]
impl<T: GetBlockHash> GetBlockHash for Arc<T> {
impl<T: GetBlockHash + Send + Sync> GetBlockHash for Arc<T> {
fn get_block_hash(&self, height: u64) -> Result<BlockHash, Error> {
maybe_await!(self.deref().get_block_hash(height))
}
}

#[maybe_async]
impl<T: WalletSync> WalletSync for Arc<T> {
fn wallet_setup<D: BatchDatabase>(
impl<T: WalletSync + Send + Sync> WalletSync for Arc<T> {
fn wallet_setup<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
) -> Result<(), Error> {
maybe_await!(self.deref().wallet_setup(database, progress_update))
}

fn wallet_sync<D: BatchDatabase>(
fn wallet_sync<D: BatchDatabase + Send + Sync>(
&self,
database: &mut D,
progress_update: Box<dyn Progress>,
Expand Down
2 changes: 1 addition & 1 deletion src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub trait Database: BatchOperations {
/// This trait defines the methods to start and apply a batch of operations.
pub trait BatchDatabase: Database {
/// Container for the operations
type Batch: BatchOperations;
type Batch: BatchOperations + Send;

/// Create a new batch container
fn begin_batch(&self) -> Self::Batch;
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl FullyNodedExport {
///
/// If the database is empty or `include_blockheight` is false, the `blockheight` field
/// returned will be `0`.
pub fn export_wallet<D: BatchDatabase>(
pub fn export_wallet<D: BatchDatabase + Send + Sync>(
wallet: &Wallet<D>,
label: &str,
include_blockheight: bool,
Expand Down
4 changes: 2 additions & 2 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub struct SyncOptions {

impl<D> Wallet<D>
where
D: BatchDatabase,
D: BatchDatabase + Send + Sync,
{
#[deprecated = "Just use Wallet::new -- all wallets are offline now!"]
/// Create a new "offline" wallet
Expand Down Expand Up @@ -1687,7 +1687,7 @@ where

/// Sync the internal database with the blockchain
#[maybe_async]
pub fn sync<B: WalletSync + GetHeight>(
pub fn sync<B: WalletSync + GetHeight + Send + Sync>(
&self,
blockchain: &B,
sync_opts: SyncOptions,
Expand Down
2 changes: 1 addition & 1 deletion src/wallet/tx_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<'a, Cs: Clone, Ctx, D> Clone for TxBuilder<'a, D, Cs, Ctx> {
}

// methods supported by both contexts, for any CoinSelectionAlgorithm
impl<'a, D: BatchDatabase, Cs: CoinSelectionAlgorithm<D>, Ctx: TxBuilderContext>
impl<'a, D: BatchDatabase + Send + Sync, Cs: CoinSelectionAlgorithm<D>, Ctx: TxBuilderContext>
TxBuilder<'a, D, Cs, Ctx>
{
/// Set a custom fee rate
Expand Down

0 comments on commit ddc84ca

Please sign in to comment.