diff --git a/ckb-bin/src/subcommand/replay.rs b/ckb-bin/src/subcommand/replay.rs index 42bc0080de..51333b0fa2 100644 --- a/ckb-bin/src/subcommand/replay.rs +++ b/ckb-bin/src/subcommand/replay.rs @@ -19,7 +19,7 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { )?; let (shared, _) = shared_builder .consensus(args.consensus.clone()) - .tx_pool_config(args.config.tx_pool) + .tx_pool_config(args.config.tx_pool.clone()) .build()?; if !args.tmp_target.is_dir() { diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index c55e1809cc..dea925f5f2 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -63,7 +63,10 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), exit_handler.wait_for_exit(); info!("Finishing work, please wait..."); + shared.tx_pool_controller().save_pool().map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })?; drop(rpc_server); - Ok(()) } diff --git a/test/Cargo.toml b/test/Cargo.toml index 65b09fba2a..b460451d58 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -39,6 +39,9 @@ lazy_static = "1.4.0" byteorder = "1.3.1" jsonrpc-core = "18.0" +[target.'cfg(not(target_os="windows"))'.dependencies] +nix = "0.20.0" + # Prevent this from interfering with workspaces [workspace] members = ["."] diff --git a/test/src/main.rs b/test/src/main.rs index c1a3658e06..3e1c95dacf 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -382,6 +382,8 @@ fn all_specs() -> Vec> { Box::new(TemplateSizeLimit), Box::new(PoolReconcile), Box::new(PoolResurrect), + #[cfg(not(target_os = "windows"))] + Box::new(PoolPersisted), Box::new(TransactionRelayBasic), Box::new(TransactionRelayLowFeeRate), // TODO failed on poor CI server diff --git a/test/src/node.rs b/test/src/node.rs index 88799f7345..88e1b395b9 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -26,15 +26,20 @@ use std::process::{self, Child, Command, Stdio}; use std::thread::sleep; use std::time::{Duration, Instant}; -struct ProcessGuard(pub Child); +struct ProcessGuard { + pub child: Child, + pub killed: bool, +} impl Drop for ProcessGuard { fn drop(&mut self) { - match self.0.kill() { - Err(e) => error!("Could not kill ckb process: {}", e), - Ok(_) => debug!("Successfully killed ckb process"), + if !self.killed { + match self.child.kill() { + Err(e) => error!("Could not kill ckb process: {}", e), + Ok(_) => debug!("Successfully killed ckb process"), + } + let _ = self.child.wait(); } - let _ = self.0.wait(); } } @@ -532,7 +537,10 @@ impl Node { } }; - self.guard = Some(ProcessGuard(child_process)); + self.guard = Some(ProcessGuard { + child: child_process, + killed: false, + }); self.node_id = Some(node_info.node_id); } @@ -540,6 +548,22 @@ impl Node { drop(self.guard.take()) } + #[cfg(not(target_os = "windows"))] + pub fn stop_gracefully(&mut self) { + if let Some(mut guard) = self.guard.take() { + if !guard.killed { + // send SIGINT to the child + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(guard.child.id() as i32), + nix::sys::signal::Signal::SIGINT, + ) + .expect("cannot send ctrl-c"); + let _ = guard.child.wait(); + guard.killed = true; + } + } + } + pub fn export(&self, target: String) { Command::new(binary()) .args(&[ diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index 9488d8f6d2..a56c3a1504 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -5,6 +5,8 @@ mod depend_tx_in_same_block; mod descendant; mod different_txs_with_same_input; mod limit; +#[cfg(not(target_os = "windows"))] +mod pool_persisted; mod pool_reconcile; mod pool_resurrect; mod proposal_expire_rule; @@ -27,6 +29,8 @@ pub use depend_tx_in_same_block::*; pub use descendant::*; pub use different_txs_with_same_input::*; pub use limit::*; +#[cfg(not(target_os = "windows"))] +pub use pool_persisted::*; pub use pool_reconcile::*; pub use pool_resurrect::*; pub use proposal_expire_rule::*; diff --git a/test/src/specs/tx_pool/pool_persisted.rs b/test/src/specs/tx_pool/pool_persisted.rs new file mode 100644 index 0000000000..c19fc9b45e --- /dev/null +++ b/test/src/specs/tx_pool/pool_persisted.rs @@ -0,0 +1,65 @@ +use crate::util::mining::{mine, mine_until_out_bootstrap_period}; +use crate::{Node, Spec}; +use ckb_logger::info; + +pub struct PoolPersisted; + +impl Spec for PoolPersisted { + crate::setup!(num_nodes: 1); + + fn run(&self, nodes: &mut Vec) { + let node0 = &mut nodes[0]; + + info!("Generate 1 block on node0"); + mine_until_out_bootstrap_period(node0); + + info!("Generate 6 txs on node0"); + let mut hash = node0.generate_transaction(); + + (0..5).for_each(|_| { + let tx = node0.new_transaction(hash.clone()); + hash = node0.rpc_client().send_transaction(tx.data().into()); + }); + + info!("Generate 1 more blocks on node0"); + mine(node0, 1); + + info!("Generate 5 more txs on node0"); + (0..5).for_each(|_| { + let tx = node0.new_transaction(hash.clone()); + hash = node0.rpc_client().send_transaction(tx.data().into()); + }); + + info!("Generate 1 more blocks on node0"); + mine(node0, 1); + + node0.wait_for_tx_pool(); + + let tx_pool_info_original = node0.get_tip_tx_pool_info(); + + info!("Stop node0 gracefully"); + node0.stop_gracefully(); + + info!("Start node0"); + node0.start(); + + let tx_pool_info_reloaded = node0.get_tip_tx_pool_info(); + info!("TxPool should be same as before"); + info!("tx_pool_info_original: {:?}", tx_pool_info_original); + info!("tx_pool_info_reloaded: {:?}", tx_pool_info_reloaded); + assert_eq!( + tx_pool_info_original.proposed, + tx_pool_info_reloaded.proposed + ); + assert_eq!(tx_pool_info_original.orphan, tx_pool_info_reloaded.orphan); + assert_eq!(tx_pool_info_original.pending, tx_pool_info_reloaded.pending); + assert_eq!( + tx_pool_info_original.total_tx_size, + tx_pool_info_reloaded.total_tx_size + ); + assert_eq!( + tx_pool_info_original.total_tx_cycles, + tx_pool_info_reloaded.total_tx_cycles + ); + } +} diff --git a/tx-pool/src/lib.rs b/tx-pool/src/lib.rs index 7c84b2addf..1a8bac6bbb 100644 --- a/tx-pool/src/lib.rs +++ b/tx-pool/src/lib.rs @@ -6,6 +6,7 @@ mod callback; mod chunk_process; mod component; pub mod error; +mod persisted; pub mod pool; mod process; pub mod service; diff --git a/tx-pool/src/persisted.rs b/tx-pool/src/persisted.rs new file mode 100644 index 0000000000..6c5ae7f334 --- /dev/null +++ b/tx-pool/src/persisted.rs @@ -0,0 +1,97 @@ +use crate::TxPool; +use ckb_error::{AnyError, OtherError}; +use ckb_types::{ + core::TransactionView, + packed::{TransactionVec, TransactionVecReader}, + prelude::*, +}; +use std::{ + fs::OpenOptions, + io::{Read as _, Write as _}, +}; + +/// The version of the persisted tx-pool data. +pub(crate) const VERSION: u32 = 1; + +impl TxPool { + pub(crate) fn load_from_file(&self) -> Result, AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + if persisted_data_file.exists() { + let mut file = OpenOptions::new() + .read(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).map_err(|err| { + let errmsg = format!( + "Failed to read the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let persisted_data = TransactionVecReader::from_slice(&buffer) + .map_err(|err| { + let errmsg = format!( + "The tx-pool persisted data file [{:?}] is broken, cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })? + .to_entity(); + + Ok(persisted_data + .into_iter() + .map(|tx| tx.into_view()) + .collect()) + } else { + Ok(Vec::new()) + } + } + + pub(crate) fn save_into_file(&mut self) -> Result<(), AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let txs = TransactionVec::new_builder() + .extend(self.drain_all_transactions().iter().map(|tx| tx.data())) + .build(); + + file.write_all(txs.as_slice()).map_err(|err| { + let errmsg = format!( + "Failed to write the tx-pool persisted data into file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + file.sync_all().map_err(|err| { + let errmsg = format!( + "Failed to sync the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + Ok(()) + } +} diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 818347e976..ba9853b930 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -93,7 +93,6 @@ impl TxPool { const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; TxPool { - config, pending: PendingQueue::new(), gap: PendingQueue::new(), proposed: ProposedPool::new(config.max_ancestors_count), @@ -101,6 +100,7 @@ impl TxPool { last_txs_updated_at, total_tx_size: 0, total_tx_cycles: 0, + config, snapshot, } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 964f238d09..19b14a41eb 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -982,10 +982,17 @@ impl TxPoolService { pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc) { let mut tx_pool = self.tx_pool.write().await; - let config = tx_pool.config; + let config = tx_pool.config.clone(); self.last_txs_updated_at = Arc::new(AtomicU64::new(0)); *tx_pool = TxPool::new(config, new_snapshot, Arc::clone(&self.last_txs_updated_at)); } + + pub(crate) async fn save_pool(&mut self) { + let mut tx_pool = self.tx_pool.write().await; + if let Err(err) = tx_pool.save_into_file() { + error!("failed to save pool, error: {:?}", err) + } + } } type PreCheckedTx = ( diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 86eba0058d..47f18cd106 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -14,6 +14,7 @@ use ckb_channel::oneshot; use ckb_error::{AnyError, Error}; use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::error; +use ckb_logger::info; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::{Snapshot, SnapshotMgr}; use ckb_stop_handler::{SignalSender, StopHandler, WATCH_INIT}; @@ -97,6 +98,7 @@ pub(crate) enum Message { ClearPool(Request, ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), + SavePool(Request<(), ()>), } /// Controller to the tx-pool service. @@ -402,7 +404,7 @@ impl TxPoolController { .map_err(Into::into) } - /// send suspend chunk process cmd + /// Sends suspend chunk process cmd pub fn suspend_chunk_process(&self) -> Result<(), AnyError> { self.chunk_tx .try_send(Command::Suspend) @@ -410,18 +412,57 @@ impl TxPoolController { .map_err(Into::into) } - /// send continue chunk process cmd + /// Sends continue chunk process cmd pub fn continue_chunk_process(&self) -> Result<(), AnyError> { self.chunk_tx .try_send(Command::Continue) .map_err(handle_send_cmd_error) .map_err(Into::into) } + + /// Saves tx pool into disk. + pub fn save_pool(&self) -> Result<(), AnyError> { + info!("Please be patient, tx-pool are saving data into disk ..."); + let (responder, response) = oneshot::channel(); + let request = Request::call((), responder); + self.sender + .try_send(Message::SavePool(request)) + .map_err(|e| { + let (_m, e) = handle_try_send_error(e); + e + })?; + block_in_place(|| response.recv()) + .map_err(handle_recv_error) + .map_err(Into::into) + } + + /// Load persisted txs into pool, assume that all txs are sorted + fn load_persisted_data(&self, txs: Vec) -> Result<(), AnyError> { + if !txs.is_empty() { + info!("Loading persisted tx-pool data, total {} txs", txs.len()); + let mut failed_txs = 0; + for tx in txs { + if self.submit_local_tx(tx)?.is_err() { + failed_txs += 1; + } + } + if failed_txs == 0 { + info!("Persisted tx-pool data is loaded"); + } else { + info!( + "Persisted tx-pool data is loaded, {} stale txs are ignored", + failed_txs + ); + } + } + Ok(()) + } } /// A builder used to create TxPoolService. pub struct TxPoolServiceBuilder { pub(crate) tx_pool_config: TxPoolConfig, + pub(crate) tx_pool_controller: TxPoolController, pub(crate) snapshot: Arc, pub(crate) block_assembler: Option, pub(crate) txs_verify_cache: Arc>, @@ -455,8 +496,21 @@ impl TxPoolServiceBuilder { let chunk = Arc::new(RwLock::new(ChunkQueue::new())); let started = Arc::new(AtomicBool::new(false)); + let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); + let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); + let controller = TxPoolController { + sender, + reorg_sender, + handle: handle.clone(), + chunk_stop, + chunk_tx, + stop, + started: Arc::clone(&started), + }; + let builder = TxPoolServiceBuilder { tx_pool_config, + tx_pool_controller: controller.clone(), snapshot, block_assembler: block_assembler_config.map(BlockAssembler::new), txs_verify_cache, @@ -469,20 +523,9 @@ impl TxPoolServiceBuilder { tx_relay_sender, chunk_rx, chunk, - started: Arc::clone(&started), - }; - - let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); - let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); - let controller = TxPoolController { - sender, - reorg_sender, - handle: handle.clone(), - chunk_stop, - chunk_tx, - stop, started, }; + (builder, controller) } @@ -516,8 +559,17 @@ impl TxPoolServiceBuilder { Arc::clone(&last_txs_updated_at), ); + let txs = match tx_pool.load_from_file() { + Ok(txs) => txs, + Err(e) => { + error!("{}", e.to_string()); + error!("Failed to load txs from tx-pool persisted data file, all txs are ignored"); + Vec::new() + } + }; + let service = TxPoolService { - tx_pool_config: Arc::new(tx_pool.config), + tx_pool_config: Arc::new(tx_pool.config.clone()), tx_pool: Arc::new(RwLock::new(tx_pool)), orphan: Arc::new(RwLock::new(OrphanPool::new())), block_assembler: self.block_assembler, @@ -581,7 +633,9 @@ impl TxPoolServiceBuilder { } } }); - + if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) { + error!("Failed to import persisted txs, cause: {}", err); + } self.started.store(true, Ordering::Relaxed); } } @@ -781,5 +835,11 @@ async fn process(mut service: TxPoolService, message: Message) { error!("responder send get_ids failed {:?}", e) }; } + Message::SavePool(Request { responder, .. }) => { + service.save_pool().await; + if let Err(e) = responder.send(()) { + error!("responder send save_pool failed {:?}", e) + }; + } } } diff --git a/util/app-config/src/app_config.rs b/util/app-config/src/app_config.rs index 741e6edf2a..26ed32f479 100644 --- a/util/app-config/src/app_config.rs +++ b/util/app-config/src/app_config.rs @@ -286,6 +286,8 @@ impl CKBAppConfig { .log_dir .join(subcommand_name.to_string() + ".log"); + self.tx_pool.adjust(root_dir, &self.data_dir); + if subcommand_name == cli::CMD_RESET_DATA { return Ok(self); } diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index 670d2bfd13..88fa7a3fac 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -2,10 +2,11 @@ use ckb_jsonrpc_types::{FeeRateDef, JsonBytes, ScriptHashType}; use ckb_types::core::{Cycle, FeeRate}; use ckb_types::H256; use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; // The default values are set in the legacy version. /// Transaction pool configuration -#[derive(Copy, Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize)] pub struct TxPoolConfig { /// Keep the transaction pool below mb pub max_mem_size: usize, @@ -18,6 +19,11 @@ pub struct TxPoolConfig { pub max_tx_verify_cycles: Cycle, /// max ancestors size limit for a single tx pub max_ancestors_count: usize, + /// The file to persist the tx pool on the disk when tx pool have been shutdown. + /// + /// By default, it is a file inside the data directory. + #[serde(default)] + pub persisted_data: PathBuf, } /// Block assembler config options. @@ -45,3 +51,22 @@ pub struct BlockAssemblerConfig { const fn default_use_binary_version_as_message_prefix() -> bool { true } + +impl TxPoolConfig { + /// Canonicalizes paths in the config options. + /// + /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool_persisted_data`. + /// + /// If `self.path` is relative, convert them to absolute path using + /// `root_dir` as current working directory. + pub fn adjust>(&mut self, root_dir: &Path, data_dir: P) { + if self.persisted_data.to_str().is_none() || self.persisted_data.to_str() == Some("") { + self.persisted_data = data_dir + .as_ref() + .to_path_buf() + .join("tx_pool_persisted_data"); + } else if self.persisted_data.is_relative() { + self.persisted_data = root_dir.to_path_buf().join(&self.persisted_data) + } + } +} diff --git a/util/app-config/src/legacy/tx_pool.rs b/util/app-config/src/legacy/tx_pool.rs index a54ec6b85b..0c491e37ac 100644 --- a/util/app-config/src/legacy/tx_pool.rs +++ b/util/app-config/src/legacy/tx_pool.rs @@ -2,6 +2,7 @@ use ckb_chain_spec::consensus::TWO_IN_TWO_OUT_CYCLES; use ckb_jsonrpc_types::FeeRateDef; use ckb_types::core::{Cycle, FeeRate}; use serde::Deserialize; +use std::path::PathBuf; // default min fee rate, 1000 shannons per kilobyte const DEFAULT_MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1000); @@ -22,6 +23,8 @@ pub(crate) struct TxPoolConfig { min_fee_rate: FeeRate, max_tx_verify_cycles: Cycle, max_ancestors_count: usize, + #[serde(default)] + persisted_data: PathBuf, } impl Default for crate::TxPoolConfig { @@ -41,6 +44,7 @@ impl Default for TxPoolConfig { min_fee_rate: DEFAULT_MIN_FEE_RATE, max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, + persisted_data: Default::default(), } } } @@ -56,6 +60,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } = input; Self { max_mem_size, @@ -63,6 +68,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } } } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 407565a18f..320d22ffd2 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -197,7 +197,7 @@ impl Launcher { let (shared, pack) = shared_builder .consensus(self.args.consensus.clone()) - .tx_pool_config(self.args.config.tx_pool) + .tx_pool_config(self.args.config.tx_pool.clone()) .notify_config(self.args.config.notify.clone()) .store_config(self.args.config.store) .block_assembler_config(block_assembler_config)