diff --git a/Cargo.lock b/Cargo.lock index 718a9f57425..660f266f3eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,12 +279,6 @@ dependencies = [ "serde", ] -[[package]] -name = "case" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6c0e7b807d60291f42f33f58480c0bfafe28ed08286446f45e463728cf9c1c" - [[package]] name = "cast" version = "0.2.2" @@ -1276,14 +1270,8 @@ dependencies = [ "ckb-verification", "faketime", "lru", -<<<<<<< HEAD "sentry", "tokio", -======= - "molecule", - "molecule-codegen", - "tokio 0.2.25", ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] @@ -2875,12 +2863,6 @@ dependencies = [ "libc", ] -[[package]] -name = "maplit" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" - [[package]] name = "mapr" version = "0.8.0" @@ -3029,23 +3011,6 @@ dependencies = [ "faster-hex", ] -[[package]] -name = "molecule-codegen" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb688831cecb451c0cdb1aba6b429cac07eec8812becf3d1ca80d071f0f29707" -dependencies = [ - "case", - "molecule", - "pest", - "pest_derive", - "proc-macro2", - "property", - "quote", - "same-file", - "semver", -] - [[package]] name = "native-tls" version = "0.2.7" @@ -3425,43 +3390,6 @@ dependencies = [ ] [[package]] -<<<<<<< HEAD -======= -name = "pest_derive" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0" -dependencies = [ - "pest", - "pest_generator", -] - -[[package]] -name = "pest_generator" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55" -dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pest_meta" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d" -dependencies = [ - "maplit", - "pest", - "sha-1", -] - -[[package]] ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown name = "petgraph" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3645,7 +3573,6 @@ dependencies = [ ] [[package]] -<<<<<<< HEAD name = "prometheus" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3658,16 +3585,6 @@ dependencies = [ "parking_lot", "protobuf", "thiserror", -======= -name = "property" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "889bff6ebe76dbef2685598944997eb7f15c9854a2e34db853a51c6ac158b9d5" -dependencies = [ - "proc-macro2", - "quote", - "syn", ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs index bf92a3136b1..eb6bbe12960 100644 --- a/benches/benches/benchmarks/overall.rs +++ b/benches/benches/benchmarks/overall.rs @@ -126,7 +126,7 @@ pub fn setup_chain(txs_size: usize) -> (Shared, ChainController) { .unwrap(); let network = dummy_network(&shared); - pack.take_tx_pool_builder().start(network); + pack.take_tx_pool_builder().start(network).unwrap(); let chain_service = ChainService::new(shared.clone(), pack.take_proposal_table()); let chain_controller = chain_service.start(Some("ChainService")); diff --git a/chain/src/tests/block_assembler.rs b/chain/src/tests/block_assembler.rs index f0dc32681bd..4a49c347aa6 100644 --- a/chain/src/tests/block_assembler.rs +++ b/chain/src/tests/block_assembler.rs @@ -45,7 +45,7 @@ fn start_chain(consensus: Option) -> (ChainController, Shared) { .unwrap(); let network = dummy_network(&shared); - pack.take_tx_pool_builder().start(network); + pack.take_tx_pool_builder().start(network).unwrap(); let chain_service = ChainService::new(shared.clone(), pack.take_proposal_table()); let chain_controller = chain_service.start::<&str>(None); diff --git a/chain/src/tests/util.rs b/chain/src/tests/util.rs index 63d8fdcf74c..bc0062335b3 100644 --- a/chain/src/tests/util.rs +++ b/chain/src/tests/util.rs @@ -127,7 +127,7 @@ pub(crate) fn start_chain(consensus: Option) -> (ChainController, Sha .build() .unwrap(); let network = dummy_network(&shared); - pack.take_tx_pool_builder().start(network); + pack.take_tx_pool_builder().start(network).unwrap(); let chain_service = ChainService::new(shared.clone(), pack.take_proposal_table()); let chain_controller = chain_service.start::<&str>(None); diff --git a/ckb-bin/src/subcommand/replay.rs b/ckb-bin/src/subcommand/replay.rs index 42bc0080deb..51333b0fa2e 100644 --- a/ckb-bin/src/subcommand/replay.rs +++ b/ckb-bin/src/subcommand/replay.rs @@ -19,7 +19,7 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { )?; let (shared, _) = shared_builder .consensus(args.consensus.clone()) - .tx_pool_config(args.config.tx_pool) + .tx_pool_config(args.config.tx_pool.clone()) .build()?; if !args.tmp_target.is_dir() { diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 24db512490f..4471da95c0c 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -53,7 +53,10 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ); let tx_pool_builder = pack.take_tx_pool_builder(); - tx_pool_builder.start(network_controller); + tx_pool_builder.start(network_controller).map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })?; let exit_handler_clone = exit_handler.clone(); ctrlc::set_handler(move || { @@ -63,18 +66,10 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), exit_handler.wait_for_exit(); info!("Finishing work, please wait..."); + shared.tx_pool_controller().save_pool().map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })?; drop(rpc_server); - drop(network_controller); - shared - .tx_pool_controller() - .persist_tx_pool() - .map_err(|err| { - eprintln!("TxPool Error: {}", err); - ExitCode::Failure - })? - .map_err(|err| { - eprintln!("TxPool Error: {}", err); - ExitCode::Failure - })?; Ok(()) } diff --git a/rpc/src/test.rs b/rpc/src/test.rs index d19c8fd67d9..4a2e776bb89 100644 --- a/rpc/src/test.rs +++ b/rpc/src/test.rs @@ -179,7 +179,8 @@ fn setup_rpc_test_suite(height: u64) -> RpcTestSuite { }; pack.take_tx_pool_builder() - .start(network_controller.clone()); + .start(network_controller.clone()) + .unwrap(); // Build chain, insert [1, height) blocks let mut parent = always_success_consensus().genesis_block; diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs index 8d44a8d146c..5c94493da4e 100644 --- a/sync/src/relayer/tests/helper.rs +++ b/sync/src/relayer/tests/helper.rs @@ -153,7 +153,7 @@ pub(crate) fn build_chain(tip: BlockNumber) -> (Relayer, OutPoint) { }; let network = dummy_network(&shared); - pack.take_tx_pool_builder().start(network); + pack.take_tx_pool_builder().start(network).unwrap(); let chain_controller = { let chain_service = ChainService::new(shared.clone(), pack.take_proposal_table()); diff --git a/test/src/specs/tx_pool/pool_persisted.rs b/test/src/specs/tx_pool/pool_persisted.rs index 58ba033c4bf..0c32ef8f4a8 100644 --- a/test/src/specs/tx_pool/pool_persisted.rs +++ b/test/src/specs/tx_pool/pool_persisted.rs @@ -50,12 +50,10 @@ impl Spec for PoolPersisted { let tx_pool_info_reloaded = node0.get_tip_tx_pool_info(); info!("TxPool should be same as before"); - assert_eq!(tx_pool_info_original, tx_pool_info_reloaded); + assert_eq!(tx_pool_info_original.total_tx_size, tx_pool_info_reloaded.total_tx_size); + assert_eq!(tx_pool_info_original.total_tx_cycles, tx_pool_info_reloaded.total_tx_cycles); info!("Check the specific values of TxPool state"); node0.assert_tx_pool_size(txs_hash2.len() as u64, txs_hash1.len() as u64); - assert!(tx_pool_info_reloaded.total_tx_size.value() > 0); - assert!(tx_pool_info_reloaded.total_tx_cycles.value() > 0); - assert!(tx_pool_info_reloaded.last_txs_updated_at.value() > 0); } } diff --git a/tx-pool/build.rs b/tx-pool/build.rs deleted file mode 100644 index 70f6fc00dd8..00000000000 --- a/tx-pool/build.rs +++ /dev/null @@ -1,18 +0,0 @@ -#![allow(missing_docs)] - -use molecule_codegen::{Compiler, Language}; - -fn compile_schema(schema: &str) { - println!("cargo:rerun-if-changed={}", schema); - let mut compiler = Compiler::new(); - compiler - .input_schema_file(schema) - .generate_code(Language::Rust) - .output_dir_set_default() - .run() - .unwrap(); -} - -fn main() { - compile_schema("schemas/persisted.mol"); -} diff --git a/tx-pool/schemas/persisted.mol b/tx-pool/schemas/persisted.mol deleted file mode 100644 index 3c637fb80f3..00000000000 --- a/tx-pool/schemas/persisted.mol +++ /dev/null @@ -1,10 +0,0 @@ -import referenced; - -table TxPool{ - version: Uint32, - transactions: TransactionVec, -} - -table TxPoolMeta { - version: Uint32, -} diff --git a/tx-pool/schemas/referenced.mol b/tx-pool/schemas/referenced.mol deleted file mode 100644 index ef0ca89a6ee..00000000000 --- a/tx-pool/schemas/referenced.mol +++ /dev/null @@ -1,67 +0,0 @@ -/* Referenced Types */ - -array Uint32 [byte; 4]; -array Uint64 [byte; 8]; - -array Byte32 [byte; 32]; -vector Byte32Vec ; - -vector Bytes ; -vector BytesVec ; - -array ProposalShortId [byte; 10]; - -vector ProposalShortIdVec ; -vector OutPointVec ; -vector CellDepVec ; -vector CellInputVec ; -vector CellOutputVec ; - -option ScriptOpt (Script); - -table Script { - code_hash: Byte32, - hash_type: byte, - args: Bytes, -} - -struct OutPoint { - tx_hash: Byte32, - index: Uint32, -} - -struct CellInput { - since: Uint64, - previous_output: OutPoint, -} - -table CellOutput { - capacity: Uint64, - lock: Script, - type_: ScriptOpt, -} - -struct CellDep { - out_point: OutPoint, - dep_type: byte, -} - -table RawTransaction { - version: Uint32, - cell_deps: CellDepVec, - header_deps: Byte32Vec, - inputs: CellInputVec, - outputs: CellOutputVec, - outputs_data: BytesVec, -} - -table Transaction { - raw: RawTransaction, - witnesses: BytesVec, -} - -table TransactionView { - hash: Byte32, - witness_hash: Byte32, - data: Transaction, -} diff --git a/tx-pool/src/persisted.rs b/tx-pool/src/persisted.rs new file mode 100644 index 00000000000..12a1048d9fa --- /dev/null +++ b/tx-pool/src/persisted.rs @@ -0,0 +1,97 @@ +use crate::TxPool; +use ckb_error::{AnyError, OtherError}; +use ckb_types::{ + core::TransactionView, + packed::{TransactionVec, TransactionVecReader}, + prelude::*, +}; +use std::{ + fs::OpenOptions, + io::{Read as _, Write as _}, +}; + +/// The version of the persisted tx-pool data. +pub(crate) const VERSION: u32 = 1; + +impl TxPool { + pub(crate) fn load_from_file(&self) -> Result, AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + if persisted_data_file.exists() { + let mut file = OpenOptions::new() + .read(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).map_err(|err| { + let errmsg = format!( + "Failed to read the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let persisted_data = TransactionVecReader::from_slice(&buffer) + .map_err(|err| { + let errmsg = format!( + "The tx-pool persisted data file [{:?}] is broken, please delete it and restart, cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })? + .to_entity(); + + Ok(persisted_data + .into_iter() + .map(|tx| tx.into_view()) + .collect()) + } else { + Ok(Vec::new()) + } + } + + pub(crate) fn save_into_file(&mut self) -> Result<(), AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let txs = TransactionVec::new_builder() + .extend(self.drain_all_transactions().iter().map(|tx| tx.data())) + .build(); + + file.write_all(txs.as_slice()).map_err(|err| { + let errmsg = format!( + "Failed to write the tx-pool persisted data into file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + file.sync_all().map_err(|err| { + let errmsg = format!( + "Failed to sync the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + Ok(()) + } +} diff --git a/tx-pool/src/persisted/conversion.rs b/tx-pool/src/persisted/conversion.rs deleted file mode 100644 index aeed8b380c1..00000000000 --- a/tx-pool/src/persisted/conversion.rs +++ /dev/null @@ -1,428 +0,0 @@ -use ckb_types::{bytes, packed, prelude::*}; -use std::collections::{HashMap, HashSet}; - -use crate::{component, persisted}; - -impl Pack for component::CacheEntry { - fn pack(&self) -> persisted::CacheEntry { - persisted::CacheEntry::new_builder() - .cycles(self.cycles.pack()) - .fee(self.fee.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::CacheEntryReader<'r> { - fn unpack(&self) -> component::CacheEntry { - component::CacheEntry { - cycles: self.cycles().unpack(), - fee: self.fee().unpack(), - } - } -} - -impl Pack for component::TxEntry { - fn pack(&self) -> persisted::TxEntry { - persisted::TxEntry::new_builder() - .transaction(self.transaction.pack()) - .cycles(self.cycles.pack()) - .size(self.size.pack()) - .fee(self.fee.pack()) - .ancestors_size(self.ancestors_size.pack()) - .ancestors_fee(self.ancestors_fee.pack()) - .ancestors_cycles(self.ancestors_cycles.pack()) - .ancestors_count(self.ancestors_count.pack()) - .related_out_points(self.related_out_points.clone().pack()) - .build() - } -} - -impl<'r> Unpack for persisted::TxEntryReader<'r> { - fn unpack(&self) -> component::TxEntry { - let related_out_points = self - .related_out_points() - .iter() - .map(|op| op.to_entity()) - .collect(); - component::TxEntry { - transaction: self.transaction().unpack(), - cycles: self.cycles().unpack(), - size: self.size().unpack(), - fee: self.fee().unpack(), - ancestors_size: self.ancestors_size().unpack(), - ancestors_fee: self.ancestors_fee().unpack(), - ancestors_cycles: self.ancestors_cycles().unpack(), - ancestors_count: self.ancestors_count().unpack(), - related_out_points, - } - } -} - -impl Pack for component::DefectEntry { - fn pack(&self) -> persisted::DefectEntry { - let cache_entry = self - .cache_entry - .map(|inner| persisted::CacheEntryOpt::new_unchecked(inner.pack().as_bytes())) - .unwrap_or_else(Default::default); - persisted::DefectEntry::new_builder() - .transaction(self.transaction.pack()) - .refs_count(self.refs_count.pack()) - .cache_entry(cache_entry) - .size(self.size.pack()) - .timestamp(self.timestamp.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::DefectEntryReader<'r> { - fn unpack(&self) -> component::DefectEntry { - component::DefectEntry { - transaction: self.transaction().unpack(), - refs_count: self.refs_count().unpack(), - cache_entry: self.cache_entry().to_opt().map(|x| x.unpack()), - size: self.size().unpack(), - timestamp: self.timestamp().unpack(), - } - } -} - -impl Pack for component::TxLink { - fn pack(&self) -> persisted::TxLink { - persisted::TxLink::new_builder() - .parents(self.parents.clone().into_iter().pack()) - .children(self.children.clone().into_iter().pack()) - .build() - } -} - -impl<'r> Unpack for persisted::TxLinkReader<'r> { - fn unpack(&self) -> component::TxLink { - component::TxLink { - parents: self.parents().to_entity().into_iter().collect(), - children: self.children().to_entity().into_iter().collect(), - } - } -} - -impl Pack for component::AncestorsScoreSortKey { - fn pack(&self) -> persisted::AncestorsScoreSortKey { - persisted::AncestorsScoreSortKey::new_builder() - .fee(self.fee.pack()) - .vbytes(self.vbytes.pack()) - .id(self.id.clone()) - .ancestors_fee(self.ancestors_fee.pack()) - .ancestors_vbytes(self.ancestors_vbytes.pack()) - .ancestors_size(self.ancestors_size.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::AncestorsScoreSortKeyReader<'r> { - fn unpack(&self) -> component::AncestorsScoreSortKey { - component::AncestorsScoreSortKey { - fee: self.fee().unpack(), - vbytes: self.vbytes().unpack(), - id: self.id().to_entity(), - ancestors_fee: self.ancestors_fee().unpack(), - ancestors_vbytes: self.ancestors_vbytes().unpack(), - ancestors_size: self.ancestors_size().unpack(), - } - } -} - -impl Pack for (packed::ProposalShortId, bytes::Bytes) { - fn pack(&self) -> persisted::ProposalShortIdKeyValue { - let (ref key, ref value) = self; - persisted::ProposalShortIdKeyValue::new_builder() - .key(key.to_owned()) - .value(value.pack()) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::TxEntryReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::TxLinkReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::DefectEntryReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl Pack for component::SortedTxMap { - fn pack(&self) -> persisted::SortedTxMap { - let sorted_index = persisted::AncestorsScoreSortKeyVec::new_builder() - .set(self.sorted_index.iter().map(|v| v.pack()).collect()) - .build(); - persisted::SortedTxMap::new_builder() - .entries(self.entries.pack()) - .sorted_index(sorted_index) - .links(self.links.pack()) - .max_ancestors_count(self.max_ancestors_count.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::SortedTxMapReader<'r> { - fn unpack(&self) -> component::SortedTxMap { - component::SortedTxMap { - entries: self.entries().unpack(), - sorted_index: self.sorted_index().iter().map(|op| op.unpack()).collect(), - links: self.links().unpack(), - max_ancestors_count: self.max_ancestors_count().unpack(), - } - } -} - -impl Pack for (packed::OutPoint, bytes::Bytes) { - fn pack(&self) -> persisted::OutPointKeyValue { - let (ref key, ref value) = self; - persisted::OutPointKeyValue::new_builder() - .key(key.to_owned()) - .value(value.pack()) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self.iter().map(|(key, value)| { - let value_opt = value - .clone() - .map(|inner| persisted::ProposalShortIdOpt::new_unchecked(inner.as_bytes())) - .unwrap_or_else(Default::default); - (key.to_owned(), value_opt.as_bytes()).pack() - }); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.clone().pack().as_bytes()).pack()); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self.iter().map(|(key, value)| { - (key.to_owned(), value.clone().into_iter().pack().as_bytes()).pack() - }); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::ProposalShortIdOptReader::new_unchecked(p.value().raw_data()) - .to_entity() - .to_opt(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) - .to_entity() - .into_iter() - .collect(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) - .to_entity() - .into_iter() - .collect(); - (k, v) - }) - .collect() - } -} - -impl Pack - for component::Edges -{ - fn pack(&self) -> persisted::OutPointEdges { - persisted::OutPointEdges::new_builder() - .inner(self.inner.pack()) - .outer(self.outer.pack()) - .deps(self.deps.pack()) - .build() - } -} - -impl<'r> Unpack> - for persisted::OutPointEdgesReader<'r> -{ - fn unpack(&self) -> component::Edges { - component::Edges { - inner: self.inner().unpack(), - outer: self.outer().unpack(), - deps: self.deps().unpack(), - } - } -} - -impl Pack for component::PendingQueue { - fn pack(&self) -> persisted::PendingQueue { - persisted::PendingQueue::new_builder() - .inner(self.inner.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::PendingQueueReader<'r> { - fn unpack(&self) -> component::PendingQueue { - component::PendingQueue { - inner: self.inner().unpack(), - } - } -} - -impl Pack for component::ProposedPool { - fn pack(&self) -> persisted::ProposedPool { - persisted::ProposedPool::new_builder() - .edges(self.edges.pack()) - .inner(self.inner.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::ProposedPoolReader<'r> { - fn unpack(&self) -> component::ProposedPool { - component::ProposedPool { - edges: self.edges().unpack(), - inner: self.inner().unpack(), - } - } -} - -impl Pack for component::OrphanPool { - fn pack(&self) -> persisted::OrphanPool { - persisted::OrphanPool::new_builder() - .vertices(self.vertices.pack()) - .edges(self.edges.pack()) - .prune_threshold(self.prune_threshold.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::OrphanPoolReader<'r> { - fn unpack(&self) -> component::OrphanPool { - component::OrphanPool { - vertices: self.vertices().unpack(), - edges: self.edges().unpack(), - prune_threshold: self.prune_threshold().unpack(), - } - } -} diff --git a/tx-pool/src/persisted/generated.rs b/tx-pool/src/persisted/generated.rs deleted file mode 100644 index d0f4285a503..00000000000 --- a/tx-pool/src/persisted/generated.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! Generated packed bytes wrappers. - -#![doc(hidden)] -#![allow(warnings)] -#![allow(clippy::all)] - -extern crate molecule; - -include!(concat!(env!("OUT_DIR"), "/", "persisted", ".rs")); diff --git a/tx-pool/src/persisted/mod.rs b/tx-pool/src/persisted/mod.rs deleted file mode 100644 index 667817a4c37..00000000000 --- a/tx-pool/src/persisted/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -use ckb_types::packed as referenced; - -mod conversion; -mod generated; - -pub(crate) use generated::*; - -/// The version of the persisted tx-pool data. -pub(crate) const VERSION: u32 = 1; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 43884ed390d..ba9853b930b 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -20,7 +20,6 @@ use ckb_types::{ Cycle, TransactionView, }, packed::{Byte32, OutPoint, ProposalShortId}, - prelude::*, }; use ckb_verification::{cache::CacheEntry, TxVerifyEnv}; use faketime::unix_time_as_millis; @@ -94,7 +93,6 @@ impl TxPool { const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; TxPool { - config, pending: PendingQueue::new(), gap: PendingQueue::new(), proposed: ProposedPool::new(config.max_ancestors_count), @@ -102,6 +100,7 @@ impl TxPool { last_txs_updated_at, total_tx_size: 0, total_tx_cycles: 0, + config, snapshot, } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 24102371c52..19b14a41eb0 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -982,16 +982,16 @@ impl TxPoolService { pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc) { let mut tx_pool = self.tx_pool.write().await; - let config = tx_pool.config; + let config = tx_pool.config.clone(); self.last_txs_updated_at = Arc::new(AtomicU64::new(0)); *tx_pool = TxPool::new(config, new_snapshot, Arc::clone(&self.last_txs_updated_at)); } - pub(crate) async fn save_pool(&self) -> Result<(), AnyError> { - let tx_pool = self.tx_pool.read().await; - tx_pool - .persisted_data() - .save_into_file(&tx_pool.config.persisted_data) + pub(crate) async fn save_pool(&mut self) { + let mut tx_pool = self.tx_pool.write().await; + if let Err(err) = tx_pool.save_into_file() { + error!("failed to save pool, error: {:?}", err) + } } } diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 4af4fed8438..ecd817d43d7 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -14,6 +14,7 @@ use ckb_channel::oneshot; use ckb_error::{AnyError, Error}; use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::error; +use ckb_logger::info; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::{Snapshot, SnapshotMgr}; use ckb_stop_handler::{SignalSender, StopHandler, WATCH_INIT}; @@ -97,7 +98,7 @@ pub(crate) enum Message { ClearPool(Request, ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), - SavePool(Request<(), Result<(), AnyError>>), + SavePool(Request<(), ()>), } /// Controller to the tx-pool service. @@ -419,25 +420,46 @@ impl TxPoolController { .map_err(Into::into) } - fn load_persisted_data(&self, data: persisted::TxPool) -> Result<(), AnyError> { - // a trick to commit transactions with the correct order - let mut remain_size = data.transactions().len(); - let mut txs_next_turn = Vec::new(); - for tx in data.transactions() { - let tx_view = tx.into_view(); - if self.submit_local_tx(tx_view.clone())?.is_err() { - txs_next_turn.push(tx_view); - } - } - while !txs_next_turn.is_empty() && remain_size != txs_next_turn.len() { - remain_size = txs_next_turn.len(); - let mut txs_failed = Vec::new(); - for tx in txs_next_turn { - if self.submit_local_tx(tx.clone())?.is_err() { - txs_failed.push(tx); + /// saves tx pool into disk. + pub fn save_pool(&self) -> Result<(), AnyError> { + info!("Please be patient, tx-pool are saving data into disk ..."); + let (responder, response) = oneshot::channel(); + let request = Request::call((), responder); + self.sender + .try_send(Message::SavePool(request)) + .map_err(|e| { + let (_m, e) = handle_try_send_error(e); + e + })?; + block_in_place(|| response.recv()) + .map_err(handle_recv_error) + .map_err(Into::into) + } + + fn load_persisted_data(&self, mut txs: Vec) -> Result<(), AnyError> { + if !txs.is_empty() { + info!("Loading persisted tx-pool data, total {} txs", txs.len()); + // a trick to commit transactions with the correct order + loop { + let mut failed_txs = Vec::new(); + for tx in txs.iter() { + if self.submit_local_tx(tx.clone())?.is_err() { + failed_txs.push(tx.clone()); + } + } + // return when all success or all failed + if failed_txs.is_empty() { + info!("Persisted tx-pool data is loaded"); + break; + } else if txs.len() == failed_txs.len() { + info!( + "Persisted tx-pool data is loaded, {} stale txs are ignored ()", + failed_txs.len() + ); + break; } + txs = failed_txs; } - txs_next_turn = txs_failed; } Ok(()) } @@ -446,6 +468,7 @@ impl TxPoolController { /// A builder used to create TxPoolService. pub struct TxPoolServiceBuilder { pub(crate) tx_pool_config: TxPoolConfig, + pub(crate) tx_pool_controller: TxPoolController, pub(crate) snapshot: Arc, pub(crate) block_assembler: Option, pub(crate) txs_verify_cache: Arc>, @@ -479,8 +502,21 @@ impl TxPoolServiceBuilder { let chunk = Arc::new(RwLock::new(ChunkQueue::new())); let started = Arc::new(AtomicBool::new(false)); + let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); + let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); + let controller = TxPoolController { + sender, + reorg_sender, + handle: handle.clone(), + chunk_stop, + chunk_tx, + stop, + started: Arc::clone(&started), + }; + let builder = TxPoolServiceBuilder { tx_pool_config, + tx_pool_controller: controller.clone(), snapshot, block_assembler: block_assembler_config.map(BlockAssembler::new), txs_verify_cache, @@ -493,20 +529,9 @@ impl TxPoolServiceBuilder { tx_relay_sender, chunk_rx, chunk, - started: Arc::clone(&started), - }; - - let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); - let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); - let controller = TxPoolController { - sender, - reorg_sender, - handle: handle.clone(), - chunk_stop, - chunk_tx, - stop, started, }; + (builder, controller) } @@ -531,7 +556,7 @@ impl TxPoolServiceBuilder { } /// Start a background thread tx-pool service by taking ownership of the Builder, and returns a TxPoolController. - pub fn start(self, network: NetworkController) { + pub fn start(self, network: NetworkController) -> Result<(), AnyError> { let last_txs_updated_at = Arc::new(AtomicU64::new(0)); let consensus = self.snapshot.cloned_consensus(); let tx_pool = TxPool::new( @@ -540,8 +565,10 @@ impl TxPoolServiceBuilder { Arc::clone(&last_txs_updated_at), ); + let txs = tx_pool.load_from_file()?; + let service = TxPoolService { - tx_pool_config: Arc::new(tx_pool.config), + tx_pool_config: Arc::new(tx_pool.config.clone()), tx_pool: Arc::new(RwLock::new(tx_pool)), orphan: Arc::new(RwLock::new(OrphanPool::new())), block_assembler: self.block_assembler, @@ -606,7 +633,9 @@ impl TxPoolServiceBuilder { } }); + self.tx_pool_controller.load_persisted_data(txs)?; self.started.store(true, Ordering::Relaxed); + Ok(()) } } @@ -806,8 +835,8 @@ async fn process(mut service: TxPoolService, message: Message) { }; } Message::SavePool(Request { responder, .. }) => { - let result = service.save_pool().await; - if let Err(e) = responder.send(result) { + service.save_pool().await; + if let Err(e) = responder.send(()) { error!("responder send save_pool failed {:?}", e) }; } diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index b9a5dc33975..88fa7a3fac9 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; // The default values are set in the legacy version. /// Transaction pool configuration -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize)] pub struct TxPoolConfig { /// Keep the transaction pool below mb pub max_mem_size: usize, @@ -26,19 +26,6 @@ pub struct TxPoolConfig { pub persisted_data: PathBuf, } -impl Default for TxPoolConfig { - fn default() -> Self { - TxPoolConfig { - max_mem_size: 20_000_000, // 20mb - max_cycles: 200_000_000_000, - min_fee_rate: DEFAULT_MIN_FEE_RATE, - max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, - max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, - persisted_data: Default::default(), - } - } -} - /// Block assembler config options. /// /// The block assembler section tells CKB how to claim the miner rewards. @@ -68,13 +55,16 @@ const fn default_use_binary_version_as_message_prefix() -> bool { impl TxPoolConfig { /// Canonicalizes paths in the config options. /// - /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool.dat`. + /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool_persisted_data`. /// /// If `self.path` is relative, convert them to absolute path using /// `root_dir` as current working directory. pub fn adjust>(&mut self, root_dir: &Path, data_dir: P) { if self.persisted_data.to_str().is_none() || self.persisted_data.to_str() == Some("") { - self.persisted_data = data_dir.as_ref().to_path_buf().join("tx_pool.dat"); + self.persisted_data = data_dir + .as_ref() + .to_path_buf() + .join("tx_pool_persisted_data"); } else if self.persisted_data.is_relative() { self.persisted_data = root_dir.to_path_buf().join(&self.persisted_data) } diff --git a/util/app-config/src/legacy/tx_pool.rs b/util/app-config/src/legacy/tx_pool.rs index a54ec6b85b2..0c491e37acb 100644 --- a/util/app-config/src/legacy/tx_pool.rs +++ b/util/app-config/src/legacy/tx_pool.rs @@ -2,6 +2,7 @@ use ckb_chain_spec::consensus::TWO_IN_TWO_OUT_CYCLES; use ckb_jsonrpc_types::FeeRateDef; use ckb_types::core::{Cycle, FeeRate}; use serde::Deserialize; +use std::path::PathBuf; // default min fee rate, 1000 shannons per kilobyte const DEFAULT_MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1000); @@ -22,6 +23,8 @@ pub(crate) struct TxPoolConfig { min_fee_rate: FeeRate, max_tx_verify_cycles: Cycle, max_ancestors_count: usize, + #[serde(default)] + persisted_data: PathBuf, } impl Default for crate::TxPoolConfig { @@ -41,6 +44,7 @@ impl Default for TxPoolConfig { min_fee_rate: DEFAULT_MIN_FEE_RATE, max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, + persisted_data: Default::default(), } } } @@ -56,6 +60,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } = input; Self { max_mem_size, @@ -63,6 +68,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } } } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 407565a18f0..320d22ffd24 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -197,7 +197,7 @@ impl Launcher { let (shared, pack) = shared_builder .consensus(self.args.consensus.clone()) - .tx_pool_config(self.args.config.tx_pool) + .tx_pool_config(self.args.config.tx_pool.clone()) .notify_config(self.args.config.notify.clone()) .store_config(self.args.config.store) .block_assembler_config(block_assembler_config)