From 16b4dcbb4c9057404ae93cb953f84c16132950ba Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Fri, 19 Mar 2021 00:27:03 +0800 Subject: [PATCH 1/5] feat: cache tx-pool state into a file when it has been shutdown --- Cargo.lock | 83 +++++ ckb-bin/src/subcommand/run.rs | 13 +- tx-pool/build.rs | 18 ++ tx-pool/schemas/persisted.mol | 103 ++++++ tx-pool/schemas/referenced.mol | 67 ++++ tx-pool/src/lib.rs | 1 + tx-pool/src/persisted/conversion.rs | 428 +++++++++++++++++++++++++ tx-pool/src/persisted/generated.rs | 9 + tx-pool/src/persisted/mod.rs | 9 + tx-pool/src/pool.rs | 1 + tx-pool/src/process.rs | 1 + tx-pool/src/service.rs | 7 + util/app-config/src/app_config.rs | 2 + util/app-config/src/configs/tx_pool.rs | 37 ++- 14 files changed, 777 insertions(+), 2 deletions(-) create mode 100644 tx-pool/build.rs create mode 100644 tx-pool/schemas/persisted.mol create mode 100644 tx-pool/schemas/referenced.mol create mode 100644 tx-pool/src/persisted/conversion.rs create mode 100644 tx-pool/src/persisted/generated.rs create mode 100644 tx-pool/src/persisted/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 660f266f3e..718a9f5742 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,6 +279,12 @@ dependencies = [ "serde", ] +[[package]] +name = "case" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6c0e7b807d60291f42f33f58480c0bfafe28ed08286446f45e463728cf9c1c" + [[package]] name = "cast" version = "0.2.2" @@ -1270,8 +1276,14 @@ dependencies = [ "ckb-verification", "faketime", "lru", +<<<<<<< HEAD "sentry", "tokio", +======= + "molecule", + "molecule-codegen", + "tokio 0.2.25", +>>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] @@ -2863,6 +2875,12 @@ dependencies = [ "libc", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "mapr" version = "0.8.0" @@ -3011,6 +3029,23 @@ dependencies = [ "faster-hex", ] +[[package]] +name = "molecule-codegen" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb688831cecb451c0cdb1aba6b429cac07eec8812becf3d1ca80d071f0f29707" +dependencies = [ + "case", + "molecule", + "pest", + "pest_derive", + "proc-macro2", + "property", + "quote", + "same-file", + "semver", +] + [[package]] name = "native-tls" version = "0.2.7" @@ -3390,6 +3425,43 @@ dependencies = [ ] [[package]] +<<<<<<< HEAD +======= +name = "pest_derive" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d" +dependencies = [ + "maplit", + "pest", + "sha-1", +] + +[[package]] +>>>>>>> feat: cache tx-pool state into a file when it has been shutdown name = "petgraph" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3573,6 +3645,7 @@ dependencies = [ ] [[package]] +<<<<<<< HEAD name = "prometheus" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3585,6 +3658,16 @@ dependencies = [ "parking_lot", "protobuf", "thiserror", +======= +name = "property" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "889bff6ebe76dbef2685598944997eb7f15c9854a2e34db853a51c6ac158b9d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +>>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index c55e1809cc..24db512490 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -64,6 +64,17 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), info!("Finishing work, please wait..."); drop(rpc_server); - + drop(network_controller); + shared + .tx_pool_controller() + .persist_tx_pool() + .map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })? + .map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })?; Ok(()) } diff --git a/tx-pool/build.rs b/tx-pool/build.rs new file mode 100644 index 0000000000..70f6fc00dd --- /dev/null +++ b/tx-pool/build.rs @@ -0,0 +1,18 @@ +#![allow(missing_docs)] + +use molecule_codegen::{Compiler, Language}; + +fn compile_schema(schema: &str) { + println!("cargo:rerun-if-changed={}", schema); + let mut compiler = Compiler::new(); + compiler + .input_schema_file(schema) + .generate_code(Language::Rust) + .output_dir_set_default() + .run() + .unwrap(); +} + +fn main() { + compile_schema("schemas/persisted.mol"); +} diff --git a/tx-pool/schemas/persisted.mol b/tx-pool/schemas/persisted.mol new file mode 100644 index 0000000000..2878b3f6e1 --- /dev/null +++ b/tx-pool/schemas/persisted.mol @@ -0,0 +1,103 @@ +import referenced; + +option ProposalShortIdOpt (ProposalShortId); + +struct CacheEntry { + cycles: Uint64, + fee: Uint64, +} + +option CacheEntryOpt (CacheEntry); + +table TxEntry { + transaction: TransactionView, + cycles: Uint64, + size: Uint32, + fee: Uint64, + ancestors_size: Uint32, + ancestors_fee: Uint64, + ancestors_cycles: Uint64, + ancestors_count: Uint32, + related_out_points: OutPointVec, +} + +table DefectEntry { + transaction: TransactionView, + refs_count: Uint32, + cache_entry: CacheEntryOpt, + size: Uint32, + timestamp: Uint64, +} + +table TxLink { + parents: ProposalShortIdVec, + children: ProposalShortIdVec, +} + +struct AncestorsScoreSortKey { + fee: Uint64, + vbytes: Uint64, + id: ProposalShortId, + ancestors_fee: Uint64, + ancestors_vbytes: Uint64, + ancestors_size: Uint32, +} + +vector AncestorsScoreSortKeyVec ; + +table ProposalShortIdKeyValue { + key: ProposalShortId, + value: Bytes, +} + +vector ProposalShortIdKeyValueVec ; + +table SortedTxMap { + entries: ProposalShortIdKeyValueVec, + sorted_index: AncestorsScoreSortKeyVec, + links: ProposalShortIdKeyValueVec, + max_ancestors_count: Uint32, +} + +table OutPointKeyValue { + key: OutPoint, + value: Bytes, +} + +vector OutPointKeyValueVec ; + +table OutPointEdges { + inner: OutPointKeyValueVec, + outer: OutPointKeyValueVec, + deps: OutPointKeyValueVec, +} + +table PendingQueue { + inner: SortedTxMap, +} + +table ProposedPool{ + edges: OutPointEdges, + inner: SortedTxMap, +} + +table OrphanPool { + vertices: ProposalShortIdKeyValueVec, + edges: OutPointKeyValueVec, + prune_threshold: Uint32, +} + +table TxPool { + version: Uint32, + pending: PendingQueue, + gap: PendingQueue, + proposed: ProposedPool, + orphan: OrphanPool, + last_txs_updated_at: Uint64, + total_tx_size: Uint32, + total_tx_cycles: Uint64, +} + +table TxPoolMeta { + version: Uint32, +} diff --git a/tx-pool/schemas/referenced.mol b/tx-pool/schemas/referenced.mol new file mode 100644 index 0000000000..ef0ca89a6e --- /dev/null +++ b/tx-pool/schemas/referenced.mol @@ -0,0 +1,67 @@ +/* Referenced Types */ + +array Uint32 [byte; 4]; +array Uint64 [byte; 8]; + +array Byte32 [byte; 32]; +vector Byte32Vec ; + +vector Bytes ; +vector BytesVec ; + +array ProposalShortId [byte; 10]; + +vector ProposalShortIdVec ; +vector OutPointVec ; +vector CellDepVec ; +vector CellInputVec ; +vector CellOutputVec ; + +option ScriptOpt (Script); + +table Script { + code_hash: Byte32, + hash_type: byte, + args: Bytes, +} + +struct OutPoint { + tx_hash: Byte32, + index: Uint32, +} + +struct CellInput { + since: Uint64, + previous_output: OutPoint, +} + +table CellOutput { + capacity: Uint64, + lock: Script, + type_: ScriptOpt, +} + +struct CellDep { + out_point: OutPoint, + dep_type: byte, +} + +table RawTransaction { + version: Uint32, + cell_deps: CellDepVec, + header_deps: Byte32Vec, + inputs: CellInputVec, + outputs: CellOutputVec, + outputs_data: BytesVec, +} + +table Transaction { + raw: RawTransaction, + witnesses: BytesVec, +} + +table TransactionView { + hash: Byte32, + witness_hash: Byte32, + data: Transaction, +} diff --git a/tx-pool/src/lib.rs b/tx-pool/src/lib.rs index 7c84b2addf..1a8bac6bbb 100644 --- a/tx-pool/src/lib.rs +++ b/tx-pool/src/lib.rs @@ -6,6 +6,7 @@ mod callback; mod chunk_process; mod component; pub mod error; +mod persisted; pub mod pool; mod process; pub mod service; diff --git a/tx-pool/src/persisted/conversion.rs b/tx-pool/src/persisted/conversion.rs new file mode 100644 index 0000000000..aeed8b380c --- /dev/null +++ b/tx-pool/src/persisted/conversion.rs @@ -0,0 +1,428 @@ +use ckb_types::{bytes, packed, prelude::*}; +use std::collections::{HashMap, HashSet}; + +use crate::{component, persisted}; + +impl Pack for component::CacheEntry { + fn pack(&self) -> persisted::CacheEntry { + persisted::CacheEntry::new_builder() + .cycles(self.cycles.pack()) + .fee(self.fee.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::CacheEntryReader<'r> { + fn unpack(&self) -> component::CacheEntry { + component::CacheEntry { + cycles: self.cycles().unpack(), + fee: self.fee().unpack(), + } + } +} + +impl Pack for component::TxEntry { + fn pack(&self) -> persisted::TxEntry { + persisted::TxEntry::new_builder() + .transaction(self.transaction.pack()) + .cycles(self.cycles.pack()) + .size(self.size.pack()) + .fee(self.fee.pack()) + .ancestors_size(self.ancestors_size.pack()) + .ancestors_fee(self.ancestors_fee.pack()) + .ancestors_cycles(self.ancestors_cycles.pack()) + .ancestors_count(self.ancestors_count.pack()) + .related_out_points(self.related_out_points.clone().pack()) + .build() + } +} + +impl<'r> Unpack for persisted::TxEntryReader<'r> { + fn unpack(&self) -> component::TxEntry { + let related_out_points = self + .related_out_points() + .iter() + .map(|op| op.to_entity()) + .collect(); + component::TxEntry { + transaction: self.transaction().unpack(), + cycles: self.cycles().unpack(), + size: self.size().unpack(), + fee: self.fee().unpack(), + ancestors_size: self.ancestors_size().unpack(), + ancestors_fee: self.ancestors_fee().unpack(), + ancestors_cycles: self.ancestors_cycles().unpack(), + ancestors_count: self.ancestors_count().unpack(), + related_out_points, + } + } +} + +impl Pack for component::DefectEntry { + fn pack(&self) -> persisted::DefectEntry { + let cache_entry = self + .cache_entry + .map(|inner| persisted::CacheEntryOpt::new_unchecked(inner.pack().as_bytes())) + .unwrap_or_else(Default::default); + persisted::DefectEntry::new_builder() + .transaction(self.transaction.pack()) + .refs_count(self.refs_count.pack()) + .cache_entry(cache_entry) + .size(self.size.pack()) + .timestamp(self.timestamp.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::DefectEntryReader<'r> { + fn unpack(&self) -> component::DefectEntry { + component::DefectEntry { + transaction: self.transaction().unpack(), + refs_count: self.refs_count().unpack(), + cache_entry: self.cache_entry().to_opt().map(|x| x.unpack()), + size: self.size().unpack(), + timestamp: self.timestamp().unpack(), + } + } +} + +impl Pack for component::TxLink { + fn pack(&self) -> persisted::TxLink { + persisted::TxLink::new_builder() + .parents(self.parents.clone().into_iter().pack()) + .children(self.children.clone().into_iter().pack()) + .build() + } +} + +impl<'r> Unpack for persisted::TxLinkReader<'r> { + fn unpack(&self) -> component::TxLink { + component::TxLink { + parents: self.parents().to_entity().into_iter().collect(), + children: self.children().to_entity().into_iter().collect(), + } + } +} + +impl Pack for component::AncestorsScoreSortKey { + fn pack(&self) -> persisted::AncestorsScoreSortKey { + persisted::AncestorsScoreSortKey::new_builder() + .fee(self.fee.pack()) + .vbytes(self.vbytes.pack()) + .id(self.id.clone()) + .ancestors_fee(self.ancestors_fee.pack()) + .ancestors_vbytes(self.ancestors_vbytes.pack()) + .ancestors_size(self.ancestors_size.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::AncestorsScoreSortKeyReader<'r> { + fn unpack(&self) -> component::AncestorsScoreSortKey { + component::AncestorsScoreSortKey { + fee: self.fee().unpack(), + vbytes: self.vbytes().unpack(), + id: self.id().to_entity(), + ancestors_fee: self.ancestors_fee().unpack(), + ancestors_vbytes: self.ancestors_vbytes().unpack(), + ancestors_size: self.ancestors_size().unpack(), + } + } +} + +impl Pack for (packed::ProposalShortId, bytes::Bytes) { + fn pack(&self) -> persisted::ProposalShortIdKeyValue { + let (ref key, ref value) = self; + persisted::ProposalShortIdKeyValue::new_builder() + .key(key.to_owned()) + .value(value.pack()) + .build() + } +} + +impl Pack + for HashMap +{ + fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { + let items = self + .iter() + .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); + persisted::ProposalShortIdKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl Pack + for HashMap +{ + fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { + let items = self + .iter() + .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); + persisted::ProposalShortIdKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl Pack + for HashMap +{ + fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { + let items = self + .iter() + .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); + persisted::ProposalShortIdKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl<'r> Unpack> + for persisted::ProposalShortIdKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = persisted::TxEntryReader::new_unchecked(p.value().raw_data()).unpack(); + (k, v) + }) + .collect() + } +} + +impl<'r> Unpack> + for persisted::ProposalShortIdKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = persisted::TxLinkReader::new_unchecked(p.value().raw_data()).unpack(); + (k, v) + }) + .collect() + } +} + +impl<'r> Unpack> + for persisted::ProposalShortIdKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = persisted::DefectEntryReader::new_unchecked(p.value().raw_data()).unpack(); + (k, v) + }) + .collect() + } +} + +impl Pack for component::SortedTxMap { + fn pack(&self) -> persisted::SortedTxMap { + let sorted_index = persisted::AncestorsScoreSortKeyVec::new_builder() + .set(self.sorted_index.iter().map(|v| v.pack()).collect()) + .build(); + persisted::SortedTxMap::new_builder() + .entries(self.entries.pack()) + .sorted_index(sorted_index) + .links(self.links.pack()) + .max_ancestors_count(self.max_ancestors_count.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::SortedTxMapReader<'r> { + fn unpack(&self) -> component::SortedTxMap { + component::SortedTxMap { + entries: self.entries().unpack(), + sorted_index: self.sorted_index().iter().map(|op| op.unpack()).collect(), + links: self.links().unpack(), + max_ancestors_count: self.max_ancestors_count().unpack(), + } + } +} + +impl Pack for (packed::OutPoint, bytes::Bytes) { + fn pack(&self) -> persisted::OutPointKeyValue { + let (ref key, ref value) = self; + persisted::OutPointKeyValue::new_builder() + .key(key.to_owned()) + .value(value.pack()) + .build() + } +} + +impl Pack + for HashMap> +{ + fn pack(&self) -> persisted::OutPointKeyValueVec { + let items = self.iter().map(|(key, value)| { + let value_opt = value + .clone() + .map(|inner| persisted::ProposalShortIdOpt::new_unchecked(inner.as_bytes())) + .unwrap_or_else(Default::default); + (key.to_owned(), value_opt.as_bytes()).pack() + }); + persisted::OutPointKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl Pack + for HashMap> +{ + fn pack(&self) -> persisted::OutPointKeyValueVec { + let items = self + .iter() + .map(|(key, value)| (key.to_owned(), value.clone().pack().as_bytes()).pack()); + persisted::OutPointKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl Pack + for HashMap> +{ + fn pack(&self) -> persisted::OutPointKeyValueVec { + let items = self.iter().map(|(key, value)| { + (key.to_owned(), value.clone().into_iter().pack().as_bytes()).pack() + }); + persisted::OutPointKeyValueVec::new_builder() + .extend(items) + .build() + } +} + +impl<'r> Unpack>> + for persisted::OutPointKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap> { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = persisted::ProposalShortIdOptReader::new_unchecked(p.value().raw_data()) + .to_entity() + .to_opt(); + (k, v) + }) + .collect() + } +} + +impl<'r> Unpack>> + for persisted::OutPointKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap> { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) + .to_entity() + .into_iter() + .collect(); + (k, v) + }) + .collect() + } +} + +impl<'r> Unpack>> + for persisted::OutPointKeyValueVecReader<'r> +{ + fn unpack(&self) -> HashMap> { + self.iter() + .map(|p| { + let k = p.key().to_entity(); + let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) + .to_entity() + .into_iter() + .collect(); + (k, v) + }) + .collect() + } +} + +impl Pack + for component::Edges +{ + fn pack(&self) -> persisted::OutPointEdges { + persisted::OutPointEdges::new_builder() + .inner(self.inner.pack()) + .outer(self.outer.pack()) + .deps(self.deps.pack()) + .build() + } +} + +impl<'r> Unpack> + for persisted::OutPointEdgesReader<'r> +{ + fn unpack(&self) -> component::Edges { + component::Edges { + inner: self.inner().unpack(), + outer: self.outer().unpack(), + deps: self.deps().unpack(), + } + } +} + +impl Pack for component::PendingQueue { + fn pack(&self) -> persisted::PendingQueue { + persisted::PendingQueue::new_builder() + .inner(self.inner.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::PendingQueueReader<'r> { + fn unpack(&self) -> component::PendingQueue { + component::PendingQueue { + inner: self.inner().unpack(), + } + } +} + +impl Pack for component::ProposedPool { + fn pack(&self) -> persisted::ProposedPool { + persisted::ProposedPool::new_builder() + .edges(self.edges.pack()) + .inner(self.inner.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::ProposedPoolReader<'r> { + fn unpack(&self) -> component::ProposedPool { + component::ProposedPool { + edges: self.edges().unpack(), + inner: self.inner().unpack(), + } + } +} + +impl Pack for component::OrphanPool { + fn pack(&self) -> persisted::OrphanPool { + persisted::OrphanPool::new_builder() + .vertices(self.vertices.pack()) + .edges(self.edges.pack()) + .prune_threshold(self.prune_threshold.pack()) + .build() + } +} + +impl<'r> Unpack for persisted::OrphanPoolReader<'r> { + fn unpack(&self) -> component::OrphanPool { + component::OrphanPool { + vertices: self.vertices().unpack(), + edges: self.edges().unpack(), + prune_threshold: self.prune_threshold().unpack(), + } + } +} diff --git a/tx-pool/src/persisted/generated.rs b/tx-pool/src/persisted/generated.rs new file mode 100644 index 0000000000..d0f4285a50 --- /dev/null +++ b/tx-pool/src/persisted/generated.rs @@ -0,0 +1,9 @@ +//! Generated packed bytes wrappers. + +#![doc(hidden)] +#![allow(warnings)] +#![allow(clippy::all)] + +extern crate molecule; + +include!(concat!(env!("OUT_DIR"), "/", "persisted", ".rs")); diff --git a/tx-pool/src/persisted/mod.rs b/tx-pool/src/persisted/mod.rs new file mode 100644 index 0000000000..da40648a86 --- /dev/null +++ b/tx-pool/src/persisted/mod.rs @@ -0,0 +1,9 @@ +use ckb_types::packed as referenced; + +mod conversion; +mod generated; + +pub(crate) use generated::*; + +/// The version of the persisted data. +pub(crate) const VERSION: u32 = 1; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 818347e976..43884ed390 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -20,6 +20,7 @@ use ckb_types::{ Cycle, TransactionView, }, packed::{Byte32, OutPoint, ProposalShortId}, + prelude::*, }; use ckb_verification::{cache::CacheEntry, TxVerifyEnv}; use faketime::unix_time_as_millis; diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 964f238d09..58627176f4 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -46,6 +46,7 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicU64, Arc}; use std::time::Duration; use std::{cmp, iter}; +use std::{fs::OpenOptions, io::Write as _}; use tokio::task::block_in_place; /// A list for plug target for `plug_entry` method diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 86eba0058d..e40b77e323 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -97,6 +97,7 @@ pub(crate) enum Message { ClearPool(Request, ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), + CachePool(Request<(), Result<(), Error>>), } /// Controller to the tx-pool service. @@ -781,5 +782,11 @@ async fn process(mut service: TxPoolService, message: Message) { error!("responder send get_ids failed {:?}", e) }; } + Message::CachePool(Request { responder, .. }) => { + let result = service.cache_pool().await; + if let Err(e) = responder.send(result) { + error!("responder send cache_pool failed {:?}", e) + }; + } } } diff --git a/util/app-config/src/app_config.rs b/util/app-config/src/app_config.rs index 741e6edf2a..26ed32f479 100644 --- a/util/app-config/src/app_config.rs +++ b/util/app-config/src/app_config.rs @@ -286,6 +286,8 @@ impl CKBAppConfig { .log_dir .join(subcommand_name.to_string() + ".log"); + self.tx_pool.adjust(root_dir, &self.data_dir); + if subcommand_name == cli::CMD_RESET_DATA { return Ok(self); } diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index 670d2bfd13..00ca11fb9b 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -2,10 +2,11 @@ use ckb_jsonrpc_types::{FeeRateDef, JsonBytes, ScriptHashType}; use ckb_types::core::{Cycle, FeeRate}; use ckb_types::H256; use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; // The default values are set in the legacy version. /// Transaction pool configuration -#[derive(Copy, Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct TxPoolConfig { /// Keep the transaction pool below mb pub max_mem_size: usize, @@ -18,6 +19,24 @@ pub struct TxPoolConfig { pub max_tx_verify_cycles: Cycle, /// max ancestors size limit for a single tx pub max_ancestors_count: usize, + /// The file to cache the tx pool state when tx pool have been shutdown. + /// + /// By default, it is a file inside the data directory. + #[serde(default)] + pub state_file: PathBuf, +} + +impl Default for TxPoolConfig { + fn default() -> Self { + TxPoolConfig { + max_mem_size: 20_000_000, // 20mb + max_cycles: 200_000_000_000, + min_fee_rate: DEFAULT_MIN_FEE_RATE, + max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, + max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, + state_file: Default::default(), + } + } } /// Block assembler config options. @@ -45,3 +64,19 @@ pub struct BlockAssemblerConfig { const fn default_use_binary_version_as_message_prefix() -> bool { true } + +impl TxPoolConfig { + /// Canonicalizes paths in the config options. + /// + /// If `self.state_file` is not set, set it to `data_dir / tx_pool.state`. + /// + /// If `self.path` is relative, convert them to absolute path using + /// `root_dir` as current working directory. + pub fn adjust>(&mut self, root_dir: &Path, data_dir: P) { + if self.state_file.to_str().is_none() || self.state_file.to_str() == Some("") { + self.state_file = data_dir.as_ref().to_path_buf().join("tx_pool.state"); + } else if self.state_file.is_relative() { + self.state_file = root_dir.to_path_buf().join(&self.state_file) + } + } +} From 5a02d1c4a993d4bc10f259b107939e9dcfff31a0 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Fri, 19 Mar 2021 02:04:07 +0800 Subject: [PATCH 2/5] test: add an integration test to test tx-pool state cache --- test/Cargo.toml | 3 ++ test/src/main.rs | 2 + test/src/node.rs | 36 +++++++++++++--- test/src/specs/tx_pool/mod.rs | 4 ++ test/src/specs/tx_pool/pool_cache.rs | 61 ++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 test/src/specs/tx_pool/pool_cache.rs diff --git a/test/Cargo.toml b/test/Cargo.toml index 65b09fba2a..2c24d8fdaf 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -39,6 +39,9 @@ lazy_static = "1.4.0" byteorder = "1.3.1" jsonrpc-core = "18.0" +[target.'cfg(target_os="linux")'.dependencies] +nix = "0.20.0" + # Prevent this from interfering with workspaces [workspace] members = ["."] diff --git a/test/src/main.rs b/test/src/main.rs index c1a3658e06..eb47e6da80 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -382,6 +382,8 @@ fn all_specs() -> Vec> { Box::new(TemplateSizeLimit), Box::new(PoolReconcile), Box::new(PoolResurrect), + #[cfg(target_os = "linux")] + Box::new(PoolCache), Box::new(TransactionRelayBasic), Box::new(TransactionRelayLowFeeRate), // TODO failed on poor CI server diff --git a/test/src/node.rs b/test/src/node.rs index 88799f7345..6c74de8419 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -26,15 +26,20 @@ use std::process::{self, Child, Command, Stdio}; use std::thread::sleep; use std::time::{Duration, Instant}; -struct ProcessGuard(pub Child); +struct ProcessGuard { + pub child: Child, + pub killed: bool, +} impl Drop for ProcessGuard { fn drop(&mut self) { - match self.0.kill() { - Err(e) => error!("Could not kill ckb process: {}", e), - Ok(_) => debug!("Successfully killed ckb process"), + if !self.killed { + match self.child.kill() { + Err(e) => error!("Could not kill ckb process: {}", e), + Ok(_) => debug!("Successfully killed ckb process"), + } + let _ = self.child.wait(); } - let _ = self.0.wait(); } } @@ -532,7 +537,10 @@ impl Node { } }; - self.guard = Some(ProcessGuard(child_process)); + self.guard = Some(ProcessGuard { + child: child_process, + killed: false, + }); self.node_id = Some(node_info.node_id); } @@ -540,6 +548,22 @@ impl Node { drop(self.guard.take()) } + #[cfg(target_os = "linux")] + pub fn stop_gracefully(&mut self) { + if let Some(mut guard) = self.guard.take() { + if !guard.killed { + // send SIGINT to the child + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(guard.child.id() as i32), + nix::sys::signal::Signal::SIGINT, + ) + .expect("cannot send ctrl-c"); + let _ = guard.child.wait(); + guard.killed = true; + } + } + } + pub fn export(&self, target: String) { Command::new(binary()) .args(&[ diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index 9488d8f6d2..d362967970 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -5,6 +5,8 @@ mod depend_tx_in_same_block; mod descendant; mod different_txs_with_same_input; mod limit; +#[cfg(target_os = "linux")] +mod pool_cache; mod pool_reconcile; mod pool_resurrect; mod proposal_expire_rule; @@ -27,6 +29,8 @@ pub use depend_tx_in_same_block::*; pub use descendant::*; pub use different_txs_with_same_input::*; pub use limit::*; +#[cfg(target_os = "linux")] +pub use pool_cache::*; pub use pool_reconcile::*; pub use pool_resurrect::*; pub use proposal_expire_rule::*; diff --git a/test/src/specs/tx_pool/pool_cache.rs b/test/src/specs/tx_pool/pool_cache.rs new file mode 100644 index 0000000000..03c725be4e --- /dev/null +++ b/test/src/specs/tx_pool/pool_cache.rs @@ -0,0 +1,61 @@ +use crate::util::mining::{mine, mine_until_out_bootstrap_period}; +use crate::{Node, Spec}; +use ckb_logger::info; + +pub struct PoolCache; + +impl Spec for PoolCache { + crate::setup!(num_nodes: 1); + + fn run(&self, nodes: &mut Vec) { + let node0 = &mut nodes[0]; + + info!("Generate 1 block on node0"); + mine_until_out_bootstrap_period(node0); + + info!("Generate 6 txs on node0"); + let mut txs_hash1 = Vec::new(); + let mut txs_hash2 = Vec::new(); + let mut hash = node0.generate_transaction(); + txs_hash1.push(hash.clone()); + + (0..5).for_each(|_| { + let tx = node0.new_transaction(hash.clone()); + hash = node0.rpc_client().send_transaction(tx.data().into()); + txs_hash1.push(hash.clone()); + }); + + info!("Generate 1 more blocks on node0"); + mine(node0, 1); + + info!("Generate 5 more txs on node0"); + (0..5).for_each(|_| { + let tx = node0.new_transaction(hash.clone()); + hash = node0.rpc_client().send_transaction(tx.data().into()); + txs_hash2.push(hash.clone()); + }); + + info!("Generate 1 more blocks on node0"); + mine(node0, 1); + + node0.wait_for_tx_pool(); + + let tx_pool_info_original = node0.get_tip_tx_pool_info(); + + info!("Stop node0 gracefully"); + node0.stop_gracefully(); + + info!("Start node0"); + node0.start(); + + let tx_pool_info_reloaded = node0.get_tip_tx_pool_info(); + info!("TxPool should be same as before"); + assert_eq!(tx_pool_info_original, tx_pool_info_reloaded); + + info!("Check the specific values of TxPool state"); + node0.assert_tx_pool_size(txs_hash2.len() as u64, txs_hash1.len() as u64); + assert!(tx_pool_info_reloaded.total_tx_size.value() > 0); + assert!(tx_pool_info_reloaded.total_tx_cycles.value() > 0); + assert!(tx_pool_info_reloaded.last_txs_updated_at.value() > 0); + } +} From 06bc208016898c1adfae79624959742a4e687a04 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Thu, 8 Apr 2021 12:33:07 +0800 Subject: [PATCH 3/5] chore: optimize code structure and variable naming --- tx-pool/schemas/persisted.mol | 97 +------------------------- tx-pool/src/process.rs | 8 ++- tx-pool/src/service.rs | 31 ++++++-- util/app-config/src/configs/tx_pool.rs | 16 ++--- 4 files changed, 44 insertions(+), 108 deletions(-) diff --git a/tx-pool/schemas/persisted.mol b/tx-pool/schemas/persisted.mol index 2878b3f6e1..3c637fb80f 100644 --- a/tx-pool/schemas/persisted.mol +++ b/tx-pool/schemas/persisted.mol @@ -1,101 +1,8 @@ import referenced; -option ProposalShortIdOpt (ProposalShortId); - -struct CacheEntry { - cycles: Uint64, - fee: Uint64, -} - -option CacheEntryOpt (CacheEntry); - -table TxEntry { - transaction: TransactionView, - cycles: Uint64, - size: Uint32, - fee: Uint64, - ancestors_size: Uint32, - ancestors_fee: Uint64, - ancestors_cycles: Uint64, - ancestors_count: Uint32, - related_out_points: OutPointVec, -} - -table DefectEntry { - transaction: TransactionView, - refs_count: Uint32, - cache_entry: CacheEntryOpt, - size: Uint32, - timestamp: Uint64, -} - -table TxLink { - parents: ProposalShortIdVec, - children: ProposalShortIdVec, -} - -struct AncestorsScoreSortKey { - fee: Uint64, - vbytes: Uint64, - id: ProposalShortId, - ancestors_fee: Uint64, - ancestors_vbytes: Uint64, - ancestors_size: Uint32, -} - -vector AncestorsScoreSortKeyVec ; - -table ProposalShortIdKeyValue { - key: ProposalShortId, - value: Bytes, -} - -vector ProposalShortIdKeyValueVec ; - -table SortedTxMap { - entries: ProposalShortIdKeyValueVec, - sorted_index: AncestorsScoreSortKeyVec, - links: ProposalShortIdKeyValueVec, - max_ancestors_count: Uint32, -} - -table OutPointKeyValue { - key: OutPoint, - value: Bytes, -} - -vector OutPointKeyValueVec ; - -table OutPointEdges { - inner: OutPointKeyValueVec, - outer: OutPointKeyValueVec, - deps: OutPointKeyValueVec, -} - -table PendingQueue { - inner: SortedTxMap, -} - -table ProposedPool{ - edges: OutPointEdges, - inner: SortedTxMap, -} - -table OrphanPool { - vertices: ProposalShortIdKeyValueVec, - edges: OutPointKeyValueVec, - prune_threshold: Uint32, -} - -table TxPool { +table TxPool{ version: Uint32, - pending: PendingQueue, - gap: PendingQueue, - proposed: ProposedPool, - orphan: OrphanPool, - last_txs_updated_at: Uint64, - total_tx_size: Uint32, - total_tx_cycles: Uint64, + transactions: TransactionVec, } table TxPoolMeta { diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 58627176f4..24102371c5 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -46,7 +46,6 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicU64, Arc}; use std::time::Duration; use std::{cmp, iter}; -use std::{fs::OpenOptions, io::Write as _}; use tokio::task::block_in_place; /// A list for plug target for `plug_entry` method @@ -987,6 +986,13 @@ impl TxPoolService { self.last_txs_updated_at = Arc::new(AtomicU64::new(0)); *tx_pool = TxPool::new(config, new_snapshot, Arc::clone(&self.last_txs_updated_at)); } + + pub(crate) async fn save_pool(&self) -> Result<(), AnyError> { + let tx_pool = self.tx_pool.read().await; + tx_pool + .persisted_data() + .save_into_file(&tx_pool.config.persisted_data) + } } type PreCheckedTx = ( diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index e40b77e323..4af4fed843 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -97,7 +97,7 @@ pub(crate) enum Message { ClearPool(Request, ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), - CachePool(Request<(), Result<(), Error>>), + SavePool(Request<(), Result<(), AnyError>>), } /// Controller to the tx-pool service. @@ -418,6 +418,29 @@ impl TxPoolController { .map_err(handle_send_cmd_error) .map_err(Into::into) } + + fn load_persisted_data(&self, data: persisted::TxPool) -> Result<(), AnyError> { + // a trick to commit transactions with the correct order + let mut remain_size = data.transactions().len(); + let mut txs_next_turn = Vec::new(); + for tx in data.transactions() { + let tx_view = tx.into_view(); + if self.submit_local_tx(tx_view.clone())?.is_err() { + txs_next_turn.push(tx_view); + } + } + while !txs_next_turn.is_empty() && remain_size != txs_next_turn.len() { + remain_size = txs_next_turn.len(); + let mut txs_failed = Vec::new(); + for tx in txs_next_turn { + if self.submit_local_tx(tx.clone())?.is_err() { + txs_failed.push(tx); + } + } + txs_next_turn = txs_failed; + } + Ok(()) + } } /// A builder used to create TxPoolService. @@ -782,10 +805,10 @@ async fn process(mut service: TxPoolService, message: Message) { error!("responder send get_ids failed {:?}", e) }; } - Message::CachePool(Request { responder, .. }) => { - let result = service.cache_pool().await; + Message::SavePool(Request { responder, .. }) => { + let result = service.save_pool().await; if let Err(e) = responder.send(result) { - error!("responder send cache_pool failed {:?}", e) + error!("responder send save_pool failed {:?}", e) }; } } diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index 00ca11fb9b..b9a5dc3397 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -19,11 +19,11 @@ pub struct TxPoolConfig { pub max_tx_verify_cycles: Cycle, /// max ancestors size limit for a single tx pub max_ancestors_count: usize, - /// The file to cache the tx pool state when tx pool have been shutdown. + /// The file to persist the tx pool on the disk when tx pool have been shutdown. /// /// By default, it is a file inside the data directory. #[serde(default)] - pub state_file: PathBuf, + pub persisted_data: PathBuf, } impl Default for TxPoolConfig { @@ -34,7 +34,7 @@ impl Default for TxPoolConfig { min_fee_rate: DEFAULT_MIN_FEE_RATE, max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, - state_file: Default::default(), + persisted_data: Default::default(), } } } @@ -68,15 +68,15 @@ const fn default_use_binary_version_as_message_prefix() -> bool { impl TxPoolConfig { /// Canonicalizes paths in the config options. /// - /// If `self.state_file` is not set, set it to `data_dir / tx_pool.state`. + /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool.dat`. /// /// If `self.path` is relative, convert them to absolute path using /// `root_dir` as current working directory. pub fn adjust>(&mut self, root_dir: &Path, data_dir: P) { - if self.state_file.to_str().is_none() || self.state_file.to_str() == Some("") { - self.state_file = data_dir.as_ref().to_path_buf().join("tx_pool.state"); - } else if self.state_file.is_relative() { - self.state_file = root_dir.to_path_buf().join(&self.state_file) + if self.persisted_data.to_str().is_none() || self.persisted_data.to_str() == Some("") { + self.persisted_data = data_dir.as_ref().to_path_buf().join("tx_pool.dat"); + } else if self.persisted_data.is_relative() { + self.persisted_data = root_dir.to_path_buf().join(&self.persisted_data) } } } From d60d154052c73b156416d5c4b01d74c2b1c5cdb2 Mon Sep 17 00:00:00 2001 From: Boyu Yang Date: Mon, 19 Apr 2021 15:57:38 +0800 Subject: [PATCH 4/5] chore: update according to the review suggestions --- test/src/main.rs | 2 +- test/src/specs/tx_pool/mod.rs | 4 ++-- test/src/specs/tx_pool/{pool_cache.rs => pool_persisted.rs} | 4 ++-- tx-pool/src/persisted/mod.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) rename test/src/specs/tx_pool/{pool_cache.rs => pool_persisted.rs} (97%) diff --git a/test/src/main.rs b/test/src/main.rs index eb47e6da80..2a0b92121b 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -383,7 +383,7 @@ fn all_specs() -> Vec> { Box::new(PoolReconcile), Box::new(PoolResurrect), #[cfg(target_os = "linux")] - Box::new(PoolCache), + Box::new(PoolPersisted), Box::new(TransactionRelayBasic), Box::new(TransactionRelayLowFeeRate), // TODO failed on poor CI server diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index d362967970..552a408e6d 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -6,7 +6,7 @@ mod descendant; mod different_txs_with_same_input; mod limit; #[cfg(target_os = "linux")] -mod pool_cache; +mod pool_persisted; mod pool_reconcile; mod pool_resurrect; mod proposal_expire_rule; @@ -30,7 +30,7 @@ pub use descendant::*; pub use different_txs_with_same_input::*; pub use limit::*; #[cfg(target_os = "linux")] -pub use pool_cache::*; +pub use pool_persisted::*; pub use pool_reconcile::*; pub use pool_resurrect::*; pub use proposal_expire_rule::*; diff --git a/test/src/specs/tx_pool/pool_cache.rs b/test/src/specs/tx_pool/pool_persisted.rs similarity index 97% rename from test/src/specs/tx_pool/pool_cache.rs rename to test/src/specs/tx_pool/pool_persisted.rs index 03c725be4e..58ba033c4b 100644 --- a/test/src/specs/tx_pool/pool_cache.rs +++ b/test/src/specs/tx_pool/pool_persisted.rs @@ -2,9 +2,9 @@ use crate::util::mining::{mine, mine_until_out_bootstrap_period}; use crate::{Node, Spec}; use ckb_logger::info; -pub struct PoolCache; +pub struct PoolPersisted; -impl Spec for PoolCache { +impl Spec for PoolPersisted { crate::setup!(num_nodes: 1); fn run(&self, nodes: &mut Vec) { diff --git a/tx-pool/src/persisted/mod.rs b/tx-pool/src/persisted/mod.rs index da40648a86..667817a4c3 100644 --- a/tx-pool/src/persisted/mod.rs +++ b/tx-pool/src/persisted/mod.rs @@ -5,5 +5,5 @@ mod generated; pub(crate) use generated::*; -/// The version of the persisted data. +/// The version of the persisted tx-pool data. pub(crate) const VERSION: u32 = 1; From b98ababf46544db785e7707d54e7585aced7c28c Mon Sep 17 00:00:00 2001 From: quake Date: Tue, 20 Apr 2021 16:26:13 +0900 Subject: [PATCH 5/5] refactor: use TransactionVec to persist tx pool --- Cargo.lock | 83 ----- ckb-bin/src/subcommand/replay.rs | 2 +- ckb-bin/src/subcommand/run.rs | 16 +- test/Cargo.toml | 2 +- test/src/main.rs | 2 +- test/src/node.rs | 2 +- test/src/specs/tx_pool/mod.rs | 4 +- test/src/specs/tx_pool/pool_persisted.rs | 28 +- tx-pool/build.rs | 18 - tx-pool/schemas/persisted.mol | 10 - tx-pool/schemas/referenced.mol | 67 ---- tx-pool/src/persisted.rs | 97 +++++ tx-pool/src/persisted/conversion.rs | 428 ----------------------- tx-pool/src/persisted/generated.rs | 9 - tx-pool/src/persisted/mod.rs | 9 - tx-pool/src/pool.rs | 3 +- tx-pool/src/process.rs | 12 +- tx-pool/src/service.rs | 102 ++++-- util/app-config/src/configs/tx_pool.rs | 22 +- util/app-config/src/legacy/tx_pool.rs | 6 + util/launcher/src/lib.rs | 2 +- 21 files changed, 209 insertions(+), 715 deletions(-) delete mode 100644 tx-pool/build.rs delete mode 100644 tx-pool/schemas/persisted.mol delete mode 100644 tx-pool/schemas/referenced.mol create mode 100644 tx-pool/src/persisted.rs delete mode 100644 tx-pool/src/persisted/conversion.rs delete mode 100644 tx-pool/src/persisted/generated.rs delete mode 100644 tx-pool/src/persisted/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 718a9f5742..660f266f3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,12 +279,6 @@ dependencies = [ "serde", ] -[[package]] -name = "case" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6c0e7b807d60291f42f33f58480c0bfafe28ed08286446f45e463728cf9c1c" - [[package]] name = "cast" version = "0.2.2" @@ -1276,14 +1270,8 @@ dependencies = [ "ckb-verification", "faketime", "lru", -<<<<<<< HEAD "sentry", "tokio", -======= - "molecule", - "molecule-codegen", - "tokio 0.2.25", ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] @@ -2875,12 +2863,6 @@ dependencies = [ "libc", ] -[[package]] -name = "maplit" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" - [[package]] name = "mapr" version = "0.8.0" @@ -3029,23 +3011,6 @@ dependencies = [ "faster-hex", ] -[[package]] -name = "molecule-codegen" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb688831cecb451c0cdb1aba6b429cac07eec8812becf3d1ca80d071f0f29707" -dependencies = [ - "case", - "molecule", - "pest", - "pest_derive", - "proc-macro2", - "property", - "quote", - "same-file", - "semver", -] - [[package]] name = "native-tls" version = "0.2.7" @@ -3425,43 +3390,6 @@ dependencies = [ ] [[package]] -<<<<<<< HEAD -======= -name = "pest_derive" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0" -dependencies = [ - "pest", - "pest_generator", -] - -[[package]] -name = "pest_generator" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55" -dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pest_meta" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d" -dependencies = [ - "maplit", - "pest", - "sha-1", -] - -[[package]] ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown name = "petgraph" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3645,7 +3573,6 @@ dependencies = [ ] [[package]] -<<<<<<< HEAD name = "prometheus" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3658,16 +3585,6 @@ dependencies = [ "parking_lot", "protobuf", "thiserror", -======= -name = "property" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "889bff6ebe76dbef2685598944997eb7f15c9854a2e34db853a51c6ac158b9d5" -dependencies = [ - "proc-macro2", - "quote", - "syn", ->>>>>>> feat: cache tx-pool state into a file when it has been shutdown ] [[package]] diff --git a/ckb-bin/src/subcommand/replay.rs b/ckb-bin/src/subcommand/replay.rs index 42bc0080de..51333b0fa2 100644 --- a/ckb-bin/src/subcommand/replay.rs +++ b/ckb-bin/src/subcommand/replay.rs @@ -19,7 +19,7 @@ pub fn replay(args: ReplayArgs, async_handle: Handle) -> Result<(), ExitCode> { )?; let (shared, _) = shared_builder .consensus(args.consensus.clone()) - .tx_pool_config(args.config.tx_pool) + .tx_pool_config(args.config.tx_pool.clone()) .build()?; if !args.tmp_target.is_dir() { diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 24db512490..dea925f5f2 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -63,18 +63,10 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), exit_handler.wait_for_exit(); info!("Finishing work, please wait..."); + shared.tx_pool_controller().save_pool().map_err(|err| { + eprintln!("TxPool Error: {}", err); + ExitCode::Failure + })?; drop(rpc_server); - drop(network_controller); - shared - .tx_pool_controller() - .persist_tx_pool() - .map_err(|err| { - eprintln!("TxPool Error: {}", err); - ExitCode::Failure - })? - .map_err(|err| { - eprintln!("TxPool Error: {}", err); - ExitCode::Failure - })?; Ok(()) } diff --git a/test/Cargo.toml b/test/Cargo.toml index 2c24d8fdaf..b460451d58 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -39,7 +39,7 @@ lazy_static = "1.4.0" byteorder = "1.3.1" jsonrpc-core = "18.0" -[target.'cfg(target_os="linux")'.dependencies] +[target.'cfg(not(target_os="windows"))'.dependencies] nix = "0.20.0" # Prevent this from interfering with workspaces diff --git a/test/src/main.rs b/test/src/main.rs index 2a0b92121b..3e1c95dacf 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -382,7 +382,7 @@ fn all_specs() -> Vec> { Box::new(TemplateSizeLimit), Box::new(PoolReconcile), Box::new(PoolResurrect), - #[cfg(target_os = "linux")] + #[cfg(not(target_os = "windows"))] Box::new(PoolPersisted), Box::new(TransactionRelayBasic), Box::new(TransactionRelayLowFeeRate), diff --git a/test/src/node.rs b/test/src/node.rs index 6c74de8419..88e1b395b9 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -548,7 +548,7 @@ impl Node { drop(self.guard.take()) } - #[cfg(target_os = "linux")] + #[cfg(not(target_os = "windows"))] pub fn stop_gracefully(&mut self) { if let Some(mut guard) = self.guard.take() { if !guard.killed { diff --git a/test/src/specs/tx_pool/mod.rs b/test/src/specs/tx_pool/mod.rs index 552a408e6d..a56c3a1504 100644 --- a/test/src/specs/tx_pool/mod.rs +++ b/test/src/specs/tx_pool/mod.rs @@ -5,7 +5,7 @@ mod depend_tx_in_same_block; mod descendant; mod different_txs_with_same_input; mod limit; -#[cfg(target_os = "linux")] +#[cfg(not(target_os = "windows"))] mod pool_persisted; mod pool_reconcile; mod pool_resurrect; @@ -29,7 +29,7 @@ pub use depend_tx_in_same_block::*; pub use descendant::*; pub use different_txs_with_same_input::*; pub use limit::*; -#[cfg(target_os = "linux")] +#[cfg(not(target_os = "windows"))] pub use pool_persisted::*; pub use pool_reconcile::*; pub use pool_resurrect::*; diff --git a/test/src/specs/tx_pool/pool_persisted.rs b/test/src/specs/tx_pool/pool_persisted.rs index 58ba033c4b..c19fc9b45e 100644 --- a/test/src/specs/tx_pool/pool_persisted.rs +++ b/test/src/specs/tx_pool/pool_persisted.rs @@ -14,15 +14,11 @@ impl Spec for PoolPersisted { mine_until_out_bootstrap_period(node0); info!("Generate 6 txs on node0"); - let mut txs_hash1 = Vec::new(); - let mut txs_hash2 = Vec::new(); let mut hash = node0.generate_transaction(); - txs_hash1.push(hash.clone()); (0..5).for_each(|_| { let tx = node0.new_transaction(hash.clone()); hash = node0.rpc_client().send_transaction(tx.data().into()); - txs_hash1.push(hash.clone()); }); info!("Generate 1 more blocks on node0"); @@ -32,7 +28,6 @@ impl Spec for PoolPersisted { (0..5).for_each(|_| { let tx = node0.new_transaction(hash.clone()); hash = node0.rpc_client().send_transaction(tx.data().into()); - txs_hash2.push(hash.clone()); }); info!("Generate 1 more blocks on node0"); @@ -50,12 +45,21 @@ impl Spec for PoolPersisted { let tx_pool_info_reloaded = node0.get_tip_tx_pool_info(); info!("TxPool should be same as before"); - assert_eq!(tx_pool_info_original, tx_pool_info_reloaded); - - info!("Check the specific values of TxPool state"); - node0.assert_tx_pool_size(txs_hash2.len() as u64, txs_hash1.len() as u64); - assert!(tx_pool_info_reloaded.total_tx_size.value() > 0); - assert!(tx_pool_info_reloaded.total_tx_cycles.value() > 0); - assert!(tx_pool_info_reloaded.last_txs_updated_at.value() > 0); + info!("tx_pool_info_original: {:?}", tx_pool_info_original); + info!("tx_pool_info_reloaded: {:?}", tx_pool_info_reloaded); + assert_eq!( + tx_pool_info_original.proposed, + tx_pool_info_reloaded.proposed + ); + assert_eq!(tx_pool_info_original.orphan, tx_pool_info_reloaded.orphan); + assert_eq!(tx_pool_info_original.pending, tx_pool_info_reloaded.pending); + assert_eq!( + tx_pool_info_original.total_tx_size, + tx_pool_info_reloaded.total_tx_size + ); + assert_eq!( + tx_pool_info_original.total_tx_cycles, + tx_pool_info_reloaded.total_tx_cycles + ); } } diff --git a/tx-pool/build.rs b/tx-pool/build.rs deleted file mode 100644 index 70f6fc00dd..0000000000 --- a/tx-pool/build.rs +++ /dev/null @@ -1,18 +0,0 @@ -#![allow(missing_docs)] - -use molecule_codegen::{Compiler, Language}; - -fn compile_schema(schema: &str) { - println!("cargo:rerun-if-changed={}", schema); - let mut compiler = Compiler::new(); - compiler - .input_schema_file(schema) - .generate_code(Language::Rust) - .output_dir_set_default() - .run() - .unwrap(); -} - -fn main() { - compile_schema("schemas/persisted.mol"); -} diff --git a/tx-pool/schemas/persisted.mol b/tx-pool/schemas/persisted.mol deleted file mode 100644 index 3c637fb80f..0000000000 --- a/tx-pool/schemas/persisted.mol +++ /dev/null @@ -1,10 +0,0 @@ -import referenced; - -table TxPool{ - version: Uint32, - transactions: TransactionVec, -} - -table TxPoolMeta { - version: Uint32, -} diff --git a/tx-pool/schemas/referenced.mol b/tx-pool/schemas/referenced.mol deleted file mode 100644 index ef0ca89a6e..0000000000 --- a/tx-pool/schemas/referenced.mol +++ /dev/null @@ -1,67 +0,0 @@ -/* Referenced Types */ - -array Uint32 [byte; 4]; -array Uint64 [byte; 8]; - -array Byte32 [byte; 32]; -vector Byte32Vec ; - -vector Bytes ; -vector BytesVec ; - -array ProposalShortId [byte; 10]; - -vector ProposalShortIdVec ; -vector OutPointVec ; -vector CellDepVec ; -vector CellInputVec ; -vector CellOutputVec ; - -option ScriptOpt (Script); - -table Script { - code_hash: Byte32, - hash_type: byte, - args: Bytes, -} - -struct OutPoint { - tx_hash: Byte32, - index: Uint32, -} - -struct CellInput { - since: Uint64, - previous_output: OutPoint, -} - -table CellOutput { - capacity: Uint64, - lock: Script, - type_: ScriptOpt, -} - -struct CellDep { - out_point: OutPoint, - dep_type: byte, -} - -table RawTransaction { - version: Uint32, - cell_deps: CellDepVec, - header_deps: Byte32Vec, - inputs: CellInputVec, - outputs: CellOutputVec, - outputs_data: BytesVec, -} - -table Transaction { - raw: RawTransaction, - witnesses: BytesVec, -} - -table TransactionView { - hash: Byte32, - witness_hash: Byte32, - data: Transaction, -} diff --git a/tx-pool/src/persisted.rs b/tx-pool/src/persisted.rs new file mode 100644 index 0000000000..6c5ae7f334 --- /dev/null +++ b/tx-pool/src/persisted.rs @@ -0,0 +1,97 @@ +use crate::TxPool; +use ckb_error::{AnyError, OtherError}; +use ckb_types::{ + core::TransactionView, + packed::{TransactionVec, TransactionVecReader}, + prelude::*, +}; +use std::{ + fs::OpenOptions, + io::{Read as _, Write as _}, +}; + +/// The version of the persisted tx-pool data. +pub(crate) const VERSION: u32 = 1; + +impl TxPool { + pub(crate) fn load_from_file(&self) -> Result, AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + if persisted_data_file.exists() { + let mut file = OpenOptions::new() + .read(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).map_err(|err| { + let errmsg = format!( + "Failed to read the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let persisted_data = TransactionVecReader::from_slice(&buffer) + .map_err(|err| { + let errmsg = format!( + "The tx-pool persisted data file [{:?}] is broken, cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })? + .to_entity(); + + Ok(persisted_data + .into_iter() + .map(|tx| tx.into_view()) + .collect()) + } else { + Ok(Vec::new()) + } + } + + pub(crate) fn save_into_file(&mut self) -> Result<(), AnyError> { + let mut persisted_data_file = self.config.persisted_data.clone(); + persisted_data_file.set_extension(format!("v{}", VERSION)); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&persisted_data_file) + .map_err(|err| { + let errmsg = format!( + "Failed to open the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + + let txs = TransactionVec::new_builder() + .extend(self.drain_all_transactions().iter().map(|tx| tx.data())) + .build(); + + file.write_all(txs.as_slice()).map_err(|err| { + let errmsg = format!( + "Failed to write the tx-pool persisted data into file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + file.sync_all().map_err(|err| { + let errmsg = format!( + "Failed to sync the tx-pool persisted data file [{:?}], cause: {}", + persisted_data_file, err + ); + OtherError::new(errmsg) + })?; + Ok(()) + } +} diff --git a/tx-pool/src/persisted/conversion.rs b/tx-pool/src/persisted/conversion.rs deleted file mode 100644 index aeed8b380c..0000000000 --- a/tx-pool/src/persisted/conversion.rs +++ /dev/null @@ -1,428 +0,0 @@ -use ckb_types::{bytes, packed, prelude::*}; -use std::collections::{HashMap, HashSet}; - -use crate::{component, persisted}; - -impl Pack for component::CacheEntry { - fn pack(&self) -> persisted::CacheEntry { - persisted::CacheEntry::new_builder() - .cycles(self.cycles.pack()) - .fee(self.fee.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::CacheEntryReader<'r> { - fn unpack(&self) -> component::CacheEntry { - component::CacheEntry { - cycles: self.cycles().unpack(), - fee: self.fee().unpack(), - } - } -} - -impl Pack for component::TxEntry { - fn pack(&self) -> persisted::TxEntry { - persisted::TxEntry::new_builder() - .transaction(self.transaction.pack()) - .cycles(self.cycles.pack()) - .size(self.size.pack()) - .fee(self.fee.pack()) - .ancestors_size(self.ancestors_size.pack()) - .ancestors_fee(self.ancestors_fee.pack()) - .ancestors_cycles(self.ancestors_cycles.pack()) - .ancestors_count(self.ancestors_count.pack()) - .related_out_points(self.related_out_points.clone().pack()) - .build() - } -} - -impl<'r> Unpack for persisted::TxEntryReader<'r> { - fn unpack(&self) -> component::TxEntry { - let related_out_points = self - .related_out_points() - .iter() - .map(|op| op.to_entity()) - .collect(); - component::TxEntry { - transaction: self.transaction().unpack(), - cycles: self.cycles().unpack(), - size: self.size().unpack(), - fee: self.fee().unpack(), - ancestors_size: self.ancestors_size().unpack(), - ancestors_fee: self.ancestors_fee().unpack(), - ancestors_cycles: self.ancestors_cycles().unpack(), - ancestors_count: self.ancestors_count().unpack(), - related_out_points, - } - } -} - -impl Pack for component::DefectEntry { - fn pack(&self) -> persisted::DefectEntry { - let cache_entry = self - .cache_entry - .map(|inner| persisted::CacheEntryOpt::new_unchecked(inner.pack().as_bytes())) - .unwrap_or_else(Default::default); - persisted::DefectEntry::new_builder() - .transaction(self.transaction.pack()) - .refs_count(self.refs_count.pack()) - .cache_entry(cache_entry) - .size(self.size.pack()) - .timestamp(self.timestamp.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::DefectEntryReader<'r> { - fn unpack(&self) -> component::DefectEntry { - component::DefectEntry { - transaction: self.transaction().unpack(), - refs_count: self.refs_count().unpack(), - cache_entry: self.cache_entry().to_opt().map(|x| x.unpack()), - size: self.size().unpack(), - timestamp: self.timestamp().unpack(), - } - } -} - -impl Pack for component::TxLink { - fn pack(&self) -> persisted::TxLink { - persisted::TxLink::new_builder() - .parents(self.parents.clone().into_iter().pack()) - .children(self.children.clone().into_iter().pack()) - .build() - } -} - -impl<'r> Unpack for persisted::TxLinkReader<'r> { - fn unpack(&self) -> component::TxLink { - component::TxLink { - parents: self.parents().to_entity().into_iter().collect(), - children: self.children().to_entity().into_iter().collect(), - } - } -} - -impl Pack for component::AncestorsScoreSortKey { - fn pack(&self) -> persisted::AncestorsScoreSortKey { - persisted::AncestorsScoreSortKey::new_builder() - .fee(self.fee.pack()) - .vbytes(self.vbytes.pack()) - .id(self.id.clone()) - .ancestors_fee(self.ancestors_fee.pack()) - .ancestors_vbytes(self.ancestors_vbytes.pack()) - .ancestors_size(self.ancestors_size.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::AncestorsScoreSortKeyReader<'r> { - fn unpack(&self) -> component::AncestorsScoreSortKey { - component::AncestorsScoreSortKey { - fee: self.fee().unpack(), - vbytes: self.vbytes().unpack(), - id: self.id().to_entity(), - ancestors_fee: self.ancestors_fee().unpack(), - ancestors_vbytes: self.ancestors_vbytes().unpack(), - ancestors_size: self.ancestors_size().unpack(), - } - } -} - -impl Pack for (packed::ProposalShortId, bytes::Bytes) { - fn pack(&self) -> persisted::ProposalShortIdKeyValue { - let (ref key, ref value) = self; - persisted::ProposalShortIdKeyValue::new_builder() - .key(key.to_owned()) - .value(value.pack()) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap -{ - fn pack(&self) -> persisted::ProposalShortIdKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.pack().as_bytes()).pack()); - persisted::ProposalShortIdKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::TxEntryReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::TxLinkReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack> - for persisted::ProposalShortIdKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::DefectEntryReader::new_unchecked(p.value().raw_data()).unpack(); - (k, v) - }) - .collect() - } -} - -impl Pack for component::SortedTxMap { - fn pack(&self) -> persisted::SortedTxMap { - let sorted_index = persisted::AncestorsScoreSortKeyVec::new_builder() - .set(self.sorted_index.iter().map(|v| v.pack()).collect()) - .build(); - persisted::SortedTxMap::new_builder() - .entries(self.entries.pack()) - .sorted_index(sorted_index) - .links(self.links.pack()) - .max_ancestors_count(self.max_ancestors_count.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::SortedTxMapReader<'r> { - fn unpack(&self) -> component::SortedTxMap { - component::SortedTxMap { - entries: self.entries().unpack(), - sorted_index: self.sorted_index().iter().map(|op| op.unpack()).collect(), - links: self.links().unpack(), - max_ancestors_count: self.max_ancestors_count().unpack(), - } - } -} - -impl Pack for (packed::OutPoint, bytes::Bytes) { - fn pack(&self) -> persisted::OutPointKeyValue { - let (ref key, ref value) = self; - persisted::OutPointKeyValue::new_builder() - .key(key.to_owned()) - .value(value.pack()) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self.iter().map(|(key, value)| { - let value_opt = value - .clone() - .map(|inner| persisted::ProposalShortIdOpt::new_unchecked(inner.as_bytes())) - .unwrap_or_else(Default::default); - (key.to_owned(), value_opt.as_bytes()).pack() - }); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self - .iter() - .map(|(key, value)| (key.to_owned(), value.clone().pack().as_bytes()).pack()); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl Pack - for HashMap> -{ - fn pack(&self) -> persisted::OutPointKeyValueVec { - let items = self.iter().map(|(key, value)| { - (key.to_owned(), value.clone().into_iter().pack().as_bytes()).pack() - }); - persisted::OutPointKeyValueVec::new_builder() - .extend(items) - .build() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = persisted::ProposalShortIdOptReader::new_unchecked(p.value().raw_data()) - .to_entity() - .to_opt(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) - .to_entity() - .into_iter() - .collect(); - (k, v) - }) - .collect() - } -} - -impl<'r> Unpack>> - for persisted::OutPointKeyValueVecReader<'r> -{ - fn unpack(&self) -> HashMap> { - self.iter() - .map(|p| { - let k = p.key().to_entity(); - let v = packed::ProposalShortIdVecReader::new_unchecked(p.value().raw_data()) - .to_entity() - .into_iter() - .collect(); - (k, v) - }) - .collect() - } -} - -impl Pack - for component::Edges -{ - fn pack(&self) -> persisted::OutPointEdges { - persisted::OutPointEdges::new_builder() - .inner(self.inner.pack()) - .outer(self.outer.pack()) - .deps(self.deps.pack()) - .build() - } -} - -impl<'r> Unpack> - for persisted::OutPointEdgesReader<'r> -{ - fn unpack(&self) -> component::Edges { - component::Edges { - inner: self.inner().unpack(), - outer: self.outer().unpack(), - deps: self.deps().unpack(), - } - } -} - -impl Pack for component::PendingQueue { - fn pack(&self) -> persisted::PendingQueue { - persisted::PendingQueue::new_builder() - .inner(self.inner.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::PendingQueueReader<'r> { - fn unpack(&self) -> component::PendingQueue { - component::PendingQueue { - inner: self.inner().unpack(), - } - } -} - -impl Pack for component::ProposedPool { - fn pack(&self) -> persisted::ProposedPool { - persisted::ProposedPool::new_builder() - .edges(self.edges.pack()) - .inner(self.inner.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::ProposedPoolReader<'r> { - fn unpack(&self) -> component::ProposedPool { - component::ProposedPool { - edges: self.edges().unpack(), - inner: self.inner().unpack(), - } - } -} - -impl Pack for component::OrphanPool { - fn pack(&self) -> persisted::OrphanPool { - persisted::OrphanPool::new_builder() - .vertices(self.vertices.pack()) - .edges(self.edges.pack()) - .prune_threshold(self.prune_threshold.pack()) - .build() - } -} - -impl<'r> Unpack for persisted::OrphanPoolReader<'r> { - fn unpack(&self) -> component::OrphanPool { - component::OrphanPool { - vertices: self.vertices().unpack(), - edges: self.edges().unpack(), - prune_threshold: self.prune_threshold().unpack(), - } - } -} diff --git a/tx-pool/src/persisted/generated.rs b/tx-pool/src/persisted/generated.rs deleted file mode 100644 index d0f4285a50..0000000000 --- a/tx-pool/src/persisted/generated.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! Generated packed bytes wrappers. - -#![doc(hidden)] -#![allow(warnings)] -#![allow(clippy::all)] - -extern crate molecule; - -include!(concat!(env!("OUT_DIR"), "/", "persisted", ".rs")); diff --git a/tx-pool/src/persisted/mod.rs b/tx-pool/src/persisted/mod.rs deleted file mode 100644 index 667817a4c3..0000000000 --- a/tx-pool/src/persisted/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -use ckb_types::packed as referenced; - -mod conversion; -mod generated; - -pub(crate) use generated::*; - -/// The version of the persisted tx-pool data. -pub(crate) const VERSION: u32 = 1; diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 43884ed390..ba9853b930 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -20,7 +20,6 @@ use ckb_types::{ Cycle, TransactionView, }, packed::{Byte32, OutPoint, ProposalShortId}, - prelude::*, }; use ckb_verification::{cache::CacheEntry, TxVerifyEnv}; use faketime::unix_time_as_millis; @@ -94,7 +93,6 @@ impl TxPool { const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; TxPool { - config, pending: PendingQueue::new(), gap: PendingQueue::new(), proposed: ProposedPool::new(config.max_ancestors_count), @@ -102,6 +100,7 @@ impl TxPool { last_txs_updated_at, total_tx_size: 0, total_tx_cycles: 0, + config, snapshot, } } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 24102371c5..19b14a41eb 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -982,16 +982,16 @@ impl TxPoolService { pub(crate) async fn clear_pool(&mut self, new_snapshot: Arc) { let mut tx_pool = self.tx_pool.write().await; - let config = tx_pool.config; + let config = tx_pool.config.clone(); self.last_txs_updated_at = Arc::new(AtomicU64::new(0)); *tx_pool = TxPool::new(config, new_snapshot, Arc::clone(&self.last_txs_updated_at)); } - pub(crate) async fn save_pool(&self) -> Result<(), AnyError> { - let tx_pool = self.tx_pool.read().await; - tx_pool - .persisted_data() - .save_into_file(&tx_pool.config.persisted_data) + pub(crate) async fn save_pool(&mut self) { + let mut tx_pool = self.tx_pool.write().await; + if let Err(err) = tx_pool.save_into_file() { + error!("failed to save pool, error: {:?}", err) + } } } diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 4af4fed843..47f18cd106 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -14,6 +14,7 @@ use ckb_channel::oneshot; use ckb_error::{AnyError, Error}; use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::error; +use ckb_logger::info; use ckb_network::{NetworkController, PeerIndex}; use ckb_snapshot::{Snapshot, SnapshotMgr}; use ckb_stop_handler::{SignalSender, StopHandler, WATCH_INIT}; @@ -97,7 +98,7 @@ pub(crate) enum Message { ClearPool(Request, ()>), GetAllEntryInfo(Request<(), TxPoolEntryInfo>), GetAllIds(Request<(), TxPoolIds>), - SavePool(Request<(), Result<(), AnyError>>), + SavePool(Request<(), ()>), } /// Controller to the tx-pool service. @@ -403,7 +404,7 @@ impl TxPoolController { .map_err(Into::into) } - /// send suspend chunk process cmd + /// Sends suspend chunk process cmd pub fn suspend_chunk_process(&self) -> Result<(), AnyError> { self.chunk_tx .try_send(Command::Suspend) @@ -411,7 +412,7 @@ impl TxPoolController { .map_err(Into::into) } - /// send continue chunk process cmd + /// Sends continue chunk process cmd pub fn continue_chunk_process(&self) -> Result<(), AnyError> { self.chunk_tx .try_send(Command::Continue) @@ -419,25 +420,40 @@ impl TxPoolController { .map_err(Into::into) } - fn load_persisted_data(&self, data: persisted::TxPool) -> Result<(), AnyError> { - // a trick to commit transactions with the correct order - let mut remain_size = data.transactions().len(); - let mut txs_next_turn = Vec::new(); - for tx in data.transactions() { - let tx_view = tx.into_view(); - if self.submit_local_tx(tx_view.clone())?.is_err() { - txs_next_turn.push(tx_view); - } - } - while !txs_next_turn.is_empty() && remain_size != txs_next_turn.len() { - remain_size = txs_next_turn.len(); - let mut txs_failed = Vec::new(); - for tx in txs_next_turn { - if self.submit_local_tx(tx.clone())?.is_err() { - txs_failed.push(tx); + /// Saves tx pool into disk. + pub fn save_pool(&self) -> Result<(), AnyError> { + info!("Please be patient, tx-pool are saving data into disk ..."); + let (responder, response) = oneshot::channel(); + let request = Request::call((), responder); + self.sender + .try_send(Message::SavePool(request)) + .map_err(|e| { + let (_m, e) = handle_try_send_error(e); + e + })?; + block_in_place(|| response.recv()) + .map_err(handle_recv_error) + .map_err(Into::into) + } + + /// Load persisted txs into pool, assume that all txs are sorted + fn load_persisted_data(&self, txs: Vec) -> Result<(), AnyError> { + if !txs.is_empty() { + info!("Loading persisted tx-pool data, total {} txs", txs.len()); + let mut failed_txs = 0; + for tx in txs { + if self.submit_local_tx(tx)?.is_err() { + failed_txs += 1; } } - txs_next_turn = txs_failed; + if failed_txs == 0 { + info!("Persisted tx-pool data is loaded"); + } else { + info!( + "Persisted tx-pool data is loaded, {} stale txs are ignored", + failed_txs + ); + } } Ok(()) } @@ -446,6 +462,7 @@ impl TxPoolController { /// A builder used to create TxPoolService. pub struct TxPoolServiceBuilder { pub(crate) tx_pool_config: TxPoolConfig, + pub(crate) tx_pool_controller: TxPoolController, pub(crate) snapshot: Arc, pub(crate) block_assembler: Option, pub(crate) txs_verify_cache: Arc>, @@ -479,8 +496,21 @@ impl TxPoolServiceBuilder { let chunk = Arc::new(RwLock::new(ChunkQueue::new())); let started = Arc::new(AtomicBool::new(false)); + let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); + let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); + let controller = TxPoolController { + sender, + reorg_sender, + handle: handle.clone(), + chunk_stop, + chunk_tx, + stop, + started: Arc::clone(&started), + }; + let builder = TxPoolServiceBuilder { tx_pool_config, + tx_pool_controller: controller.clone(), snapshot, block_assembler: block_assembler_config.map(BlockAssembler::new), txs_verify_cache, @@ -493,20 +523,9 @@ impl TxPoolServiceBuilder { tx_relay_sender, chunk_rx, chunk, - started: Arc::clone(&started), - }; - - let stop = StopHandler::new(SignalSender::Watch(signal_sender), None); - let chunk_stop = StopHandler::new(SignalSender::Crossbeam(chunk_tx.clone()), None); - let controller = TxPoolController { - sender, - reorg_sender, - handle: handle.clone(), - chunk_stop, - chunk_tx, - stop, started, }; + (builder, controller) } @@ -540,8 +559,17 @@ impl TxPoolServiceBuilder { Arc::clone(&last_txs_updated_at), ); + let txs = match tx_pool.load_from_file() { + Ok(txs) => txs, + Err(e) => { + error!("{}", e.to_string()); + error!("Failed to load txs from tx-pool persisted data file, all txs are ignored"); + Vec::new() + } + }; + let service = TxPoolService { - tx_pool_config: Arc::new(tx_pool.config), + tx_pool_config: Arc::new(tx_pool.config.clone()), tx_pool: Arc::new(RwLock::new(tx_pool)), orphan: Arc::new(RwLock::new(OrphanPool::new())), block_assembler: self.block_assembler, @@ -605,7 +633,9 @@ impl TxPoolServiceBuilder { } } }); - + if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) { + error!("Failed to import persisted txs, cause: {}", err); + } self.started.store(true, Ordering::Relaxed); } } @@ -806,8 +836,8 @@ async fn process(mut service: TxPoolService, message: Message) { }; } Message::SavePool(Request { responder, .. }) => { - let result = service.save_pool().await; - if let Err(e) = responder.send(result) { + service.save_pool().await; + if let Err(e) = responder.send(()) { error!("responder send save_pool failed {:?}", e) }; } diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index b9a5dc3397..88fa7a3fac 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; // The default values are set in the legacy version. /// Transaction pool configuration -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize)] pub struct TxPoolConfig { /// Keep the transaction pool below mb pub max_mem_size: usize, @@ -26,19 +26,6 @@ pub struct TxPoolConfig { pub persisted_data: PathBuf, } -impl Default for TxPoolConfig { - fn default() -> Self { - TxPoolConfig { - max_mem_size: 20_000_000, // 20mb - max_cycles: 200_000_000_000, - min_fee_rate: DEFAULT_MIN_FEE_RATE, - max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, - max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, - persisted_data: Default::default(), - } - } -} - /// Block assembler config options. /// /// The block assembler section tells CKB how to claim the miner rewards. @@ -68,13 +55,16 @@ const fn default_use_binary_version_as_message_prefix() -> bool { impl TxPoolConfig { /// Canonicalizes paths in the config options. /// - /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool.dat`. + /// If `self.persisted_data` is not set, set it to `data_dir / tx_pool_persisted_data`. /// /// If `self.path` is relative, convert them to absolute path using /// `root_dir` as current working directory. pub fn adjust>(&mut self, root_dir: &Path, data_dir: P) { if self.persisted_data.to_str().is_none() || self.persisted_data.to_str() == Some("") { - self.persisted_data = data_dir.as_ref().to_path_buf().join("tx_pool.dat"); + self.persisted_data = data_dir + .as_ref() + .to_path_buf() + .join("tx_pool_persisted_data"); } else if self.persisted_data.is_relative() { self.persisted_data = root_dir.to_path_buf().join(&self.persisted_data) } diff --git a/util/app-config/src/legacy/tx_pool.rs b/util/app-config/src/legacy/tx_pool.rs index a54ec6b85b..0c491e37ac 100644 --- a/util/app-config/src/legacy/tx_pool.rs +++ b/util/app-config/src/legacy/tx_pool.rs @@ -2,6 +2,7 @@ use ckb_chain_spec::consensus::TWO_IN_TWO_OUT_CYCLES; use ckb_jsonrpc_types::FeeRateDef; use ckb_types::core::{Cycle, FeeRate}; use serde::Deserialize; +use std::path::PathBuf; // default min fee rate, 1000 shannons per kilobyte const DEFAULT_MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1000); @@ -22,6 +23,8 @@ pub(crate) struct TxPoolConfig { min_fee_rate: FeeRate, max_tx_verify_cycles: Cycle, max_ancestors_count: usize, + #[serde(default)] + persisted_data: PathBuf, } impl Default for crate::TxPoolConfig { @@ -41,6 +44,7 @@ impl Default for TxPoolConfig { min_fee_rate: DEFAULT_MIN_FEE_RATE, max_tx_verify_cycles: DEFAULT_MAX_TX_VERIFY_CYCLES, max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, + persisted_data: Default::default(), } } } @@ -56,6 +60,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } = input; Self { max_mem_size, @@ -63,6 +68,7 @@ impl From for crate::TxPoolConfig { min_fee_rate, max_tx_verify_cycles, max_ancestors_count, + persisted_data, } } } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 407565a18f..320d22ffd2 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -197,7 +197,7 @@ impl Launcher { let (shared, pack) = shared_builder .consensus(self.args.consensus.clone()) - .tx_pool_config(self.args.config.tx_pool) + .tx_pool_config(self.args.config.tx_pool.clone()) .notify_config(self.args.config.notify.clone()) .store_config(self.args.config.store) .block_assembler_config(block_assembler_config)