Skip to content

Commit

Permalink
Merge pull request #187 from quake/quake/send-chain-txs
Browse files Browse the repository at this point in the history
feat: send chain txs
  • Loading branch information
quake authored Jan 29, 2024
2 parents 789b08c + adb167c commit ba5545e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 28 deletions.
8 changes: 7 additions & 1 deletion src/protocols/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ pub(crate) struct RelayProtocol {
}

// a simple struct to store the pending transactions in memory with size limit
pub(crate) struct PendingTxs {
pub struct PendingTxs {
txs: LinkedHashMap<packed::Byte32, (packed::Transaction, Cycle, HashSet<PeerId>)>,
updated_at: Instant,
limit: usize,
}

impl Default for PendingTxs {
fn default() -> Self {
Self::new(64)
}
}

impl PendingTxs {
pub fn new(limit: usize) -> Self {
Self {
Expand Down
10 changes: 5 additions & 5 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ pub struct BlockFilterRpcImpl {
}

pub struct TransactionRpcImpl {
pub(crate) pending_txs: Arc<RwLock<PendingTxs>>,
pub(crate) swc: StorageWithChainData,
pub(crate) consensus: Arc<Consensus>,
}
Expand Down Expand Up @@ -1173,7 +1172,8 @@ impl TransactionRpc for TransactionRpcImpl {
let tx = tx.into_view();
let cycles = verify_tx(tx.clone(), &self.swc, Arc::clone(&self.consensus))
.map_err(|e| Error::invalid_params(format!("invalid transaction: {:?}", e)))?;
self.pending_txs
self.swc
.pending_txs()
.write()
.expect("pending_txs lock is poisoned")
.push(tx.clone(), cycles);
Expand All @@ -1198,7 +1198,8 @@ impl TransactionRpc for TransactionRpcImpl {
}

if let Some((transaction, cycles, _)) = self
.pending_txs
.swc
.pending_txs()
.read()
.expect("pending_txs lock is poisoned")
.get(&tx_hash.pack())
Expand Down Expand Up @@ -1314,11 +1315,10 @@ impl Service {
consensus: Consensus,
) -> Server {
let mut io_handler = IoHandler::new();
let swc = StorageWithChainData::new(storage, Arc::clone(&peers));
let swc = StorageWithChainData::new(storage, Arc::clone(&peers), Arc::clone(&pending_txs));
let block_filter_rpc_impl = BlockFilterRpcImpl { swc: swc.clone() };
let chain_rpc_impl = ChainRpcImpl { swc: swc.clone() };
let transaction_rpc_impl = TransactionRpcImpl {
pending_txs,
swc,
consensus: Arc::new(consensus),
};
Expand Down
46 changes: 42 additions & 4 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ckb_types::{
use rocksdb::{prelude::*, Direction, IteratorMode, WriteBatch, DB};

use crate::error::Result;
use crate::protocols::Peers;
use crate::protocols::{Peers, PendingTxs};

pub const LAST_STATE_KEY: &str = "LAST_STATE";
const GENESIS_BLOCK_KEY: &str = "GENESIS_BLOCK";
Expand Down Expand Up @@ -1128,17 +1128,26 @@ impl HeaderProvider for Storage {
pub struct StorageWithChainData {
storage: Storage,
peers: Arc<Peers>,
pending_txs: Arc<RwLock<PendingTxs>>,
}

impl StorageWithChainData {
pub fn new(storage: Storage, peers: Arc<Peers>) -> Self {
Self { storage, peers }
pub fn new(storage: Storage, peers: Arc<Peers>, pending_txs: Arc<RwLock<PendingTxs>>) -> Self {
Self {
storage,
peers,
pending_txs,
}
}

pub fn storage(&self) -> &Storage {
&self.storage
}

pub fn pending_txs(&self) -> &RwLock<PendingTxs> {
&self.pending_txs
}

pub(crate) fn matched_blocks(&self) -> &RwLock<HashMap<H256, (bool, Option<packed::Block>)>> {
self.peers.matched_blocks()
}
Expand Down Expand Up @@ -1190,7 +1199,36 @@ impl CellDataProvider for StorageWithChainData {

impl CellProvider for StorageWithChainData {
fn cell(&self, out_point: &OutPoint, eager_load: bool) -> CellStatus {
self.storage.cell(out_point, eager_load)
match self.storage.cell(out_point, eager_load) {
CellStatus::Live(cell_meta) => CellStatus::Live(cell_meta),
_ => {
if let Some((tx, _, _)) = self
.pending_txs
.read()
.expect("poisoned")
.get(&out_point.tx_hash())
{
if let Some(cell_output) = tx.raw().outputs().get(out_point.index().unpack()) {
let output_data = tx
.raw()
.outputs_data()
.get(out_point.index().unpack())
.expect("output_data's index should be same as output")
.raw_data();
let output_data_data_hash = CellOutput::calc_data_hash(&output_data);
return CellStatus::Live(CellMeta {
out_point: out_point.clone(),
cell_output,
transaction_info: None,
data_bytes: output_data.len() as u64,
mem_cell_data: Some(output_data),
mem_cell_data_hash: Some(output_data_data_hash),
});
}
}
CellStatus::Unknown
}
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/subcmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl RunConfig {
.expect("build consensus should be OK");
storage.init_genesis_block(consensus.genesis_block().data());

let pending_txs = Arc::new(RwLock::new(PendingTxs::new(64)));
let pending_txs = Arc::new(RwLock::new(PendingTxs::default()));
let max_outbound_peers = self.run_env.network.max_outbound_peers;
let network_state = NetworkState::from_config(self.run_env.network)
.map(|network_state| {
Expand Down
65 changes: 50 additions & 15 deletions src/tests/service.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions src/tests/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ fn verify_valid_transaction() {
// https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30
let transaction: packed::Transaction = serde_json::from_str::<Transaction>(r#"{"cell_deps":[{"dep_type":"dep_group","out_point":{"index":"0x0","tx_hash":"0xf8de3bb47d055cdf460d93a2a6e1b05f7432f9777c8c474abf4eec1d4aee5d37"}}],"header_deps":[],"inputs":[{"previous_output":{"index":"0x7","tx_hash":"0x8f8c79eb6671709633fe6a46de93c0fedc9c1b8a6527a18d3983879542635c9f"},"since":"0x0"}],"outputs":[{"capacity":"0x470de4df820000","lock":{"args":"0xff5094c2c5f476fc38510018609a3fd921dd28ad","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null},{"capacity":"0xb61134e5a35e800","lock":{"args":"0x64257f00b6b63e987609fa9be2d0c86d351020fb","code_hash":"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8","hash_type":"type"},"type":null}],"outputs_data":["0x","0x"],"version":"0x0","witnesses":["0x5500000010000000550000005500000041000000af34b54bebf8c5971da6a880f2df5a186c3f8d0b5c9a1fe1a90c95b8a4fb89ef3bab1ccec13797dcb3fee80400f953227dd7741227e08032e3598e16ccdaa49c00"]}"#).unwrap().into();

let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers());
let swc =
StorageWithChainData::new(storage.to_owned(), chain.create_peers(), Default::default());
let result = verify_tx(transaction.into_view(), &swc, consensus).unwrap();
// please note that the cycle (1682789) of this transaction displayed on the explorer is wrong
// it's fixed in https://github.com/nervosnetwork/ckb/pull/4218
Expand All @@ -43,7 +44,8 @@ fn non_contextual_transaction_verifier() {
let chain = MockChain::new_with_default_pow("non_contextual_transaction_verifier");
let storage = chain.client_storage();
let consensus = Arc::new(chain.consensus().clone());
let swc = StorageWithChainData::new(storage.to_owned(), chain.create_peers());
let swc =
StorageWithChainData::new(storage.to_owned(), chain.create_peers(), Default::default());

// duplicate cell deps base on a valid transaction
// https://pudge.explorer.nervos.org/transaction/0xf34f4eaac4a662927fb52d4cb608e603150b9e0678a0f5ed941e3cfd5b68fb30
Expand Down

0 comments on commit ba5545e

Please sign in to comment.