From a86ed45ee619840350624afb1e2b893041f36495 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Mon, 4 Sep 2023 11:37:58 +0800 Subject: [PATCH] Return malformed_peers from ckb-chain to ckb-sync --- Cargo.lock | 1 + chain/src/chain.rs | 54 +++++++++++++++++++------- shared/Cargo.toml | 1 + shared/src/types/mod.rs | 6 +++ sync/src/synchronizer/block_process.rs | 5 ++- sync/src/synchronizer/mod.rs | 21 ++++++++-- sync/src/types/mod.rs | 18 +++------ 7 files changed, 74 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec4a359eb0f..9b7a0e81899 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,6 +1265,7 @@ dependencies = [ "ckb-db-schema", "ckb-error", "ckb-logger", + "ckb-network", "ckb-notify", "ckb-proposal-table", "ckb-snapshot", diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 7f42df312e3..cb16c5da0dd 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -11,11 +11,13 @@ use ckb_logger::{ self, debug, error, info, log_enabled, log_enabled_target, trace, trace_target, warn, }; use ckb_merkle_mountain_range::leaf_index_to_mmr_size; +use ckb_network::PeerId; use ckb_proposal_table::ProposalTable; #[cfg(debug_assertions)] use ckb_rust_unstable_port::IsSorted; use ckb_shared::block_status::BlockStatus; use ckb_shared::shared::Shared; +use ckb_shared::types::VerifyFailedBlockInfo; use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction}; use ckb_systemtime::unix_time_as_millis; @@ -83,7 +85,10 @@ impl ChainController { /// If the block already exists, does nothing and false is returned. /// /// [BlockVerifier] [NonContextualBlockTxsVerifier] [ContextualBlockVerifier] will performed - pub fn process_block(&self, block: Arc) -> Result { + pub fn process_block( + &self, + block: Arc, + ) -> (Result, Vec) { self.internal_process_block(block, Switch::NONE) } @@ -94,7 +99,7 @@ impl ChainController { &self, block: Arc, switch: Switch, - ) -> Result { + ) -> (Result, Vec) { Request::call(&self.process_block_sender, (block, switch)).unwrap_or_else(|| { Err(InternalErrorKind::System .other("Chain service has gone") @@ -248,12 +253,16 @@ pub struct ChainService { unverified_tx: Sender, unverified_rx: Receiver, + + verify_failed_blocks_tx: Sender, + verify_failed_blocks_rx: Receiver, } #[derive(Clone)] struct UnverifiedBlock { block: Arc, parent_header: HeaderView, + peer_id: PeerId, switch: Switch, } @@ -266,6 +275,8 @@ impl ChainService { let (new_block_tx, new_block_rx) = channel::bounded::<(Arc, Switch)>(BLOCK_DOWNLOAD_WINDOW as usize); + let (verify_failed_blocks_tx, verify_failed_blocks_rx) = channel::unbounded(); + ChainService { shared, proposal_table: Arc::new(Mutex::new(proposal_table)), @@ -274,6 +285,8 @@ impl ChainService { unverified_rx, new_block_tx, new_block_rx, + verify_failed_blocks_tx, + verify_failed_blocks_rx, } } @@ -409,9 +422,18 @@ impl ChainService { unverified_block.block.hash(), err ); - // TODO punish the peer who give me the bad block + if let Err(SendError(peer_id)) = + self.verify_failed_blocks_tx.send(VerifyFailedBlockInfo { + block_hash: unverified_block.block.hash(), + peer_id: unverified_block.peer_id, + }) + { + error!( + "send verify_failed_blocks_tx failed for peer: {:?}", + unverified_block.peer_id + ); + } - // TODO decrease unverified_tip let tip = self .shared .store() @@ -496,6 +518,7 @@ impl ChainService { block: descendant.to_owned(), parent_header, switch, + peer_id, }) { Ok(_) => {} Err(err) => error!("send unverified_tx failed: {}", err), @@ -642,23 +665,26 @@ impl ChainService { // make block IO and verify asynchronize #[doc(hidden)] - pub fn process_block_v2(&self, block: Arc, switch: Switch) -> Result { + pub fn process_block_v2( + &self, + block: Arc, + switch: Switch, + ) -> (Result, Vec) { let block_number = block.number(); let block_hash = block.hash(); if block_number < 1 { warn!("receive 0 number block: 0-{}", block_hash); } - // if self - // .shared - // .contains_block_status(&block_hash, BlockStatus::BLOCK_RECEIVED) - // { - // debug!("block {}-{} has been stored", block_number, block_hash); - // return Ok(false); - // } + let failed_blocks_peer_ids: Vec = + self.verify_failed_blocks_rx.iter().collect(); if !switch.disable_non_contextual() { - self.non_contextual_verify(&block)?; + let result = self.non_contextual_verify(&block); + match result { + Err(err) => return (Err(err), failed_blocks_peer_ids), + _ => {} + } } match self.new_block_tx.send((block, switch)) { @@ -676,7 +702,7 @@ impl ChainService { self.shared.get_unverified_tip().number(), ); - Ok(false) + (Ok(false), failed_blocks_peer_ids) } fn accept_block(&self, block: Arc) -> Result, Error> { diff --git a/shared/Cargo.toml b/shared/Cargo.toml index bb74e200b52..ab21fd55ece 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -26,6 +26,7 @@ ckb-async-runtime = { path = "../util/runtime", version = "= 0.112.0-pre" } ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.112.0-pre" } ckb-constant = { path = "../util/constant", version = "= 0.112.0-pre" } ckb-systemtime = { path = "../util/systemtime", version = "= 0.112.0-pre" } +ckb-network = { path = "../network", version = "= 0.112.0-pre" } ckb-util = { path = "../util", version = "= 0.112.0-pre" } bitflags = "1.0" tokio = { version = "1", features = ["sync"] } diff --git a/shared/src/types/mod.rs b/shared/src/types/mod.rs index 8db42092b12..f0083e6596a 100644 --- a/shared/src/types/mod.rs +++ b/shared/src/types/mod.rs @@ -1,3 +1,4 @@ +use ckb_network::PeerId; use ckb_types::core::{BlockNumber, EpochNumberWithFraction}; use ckb_types::packed::Byte32; use ckb_types::prelude::{Entity, FromSliceShouldBeOk, Reader}; @@ -304,3 +305,8 @@ fn get_skip_height(height: BlockNumber) -> BlockNumber { } pub const SHRINK_THRESHOLD: usize = 300; + +pub struct VerifyFailedBlockInfo { + pub block_hash: Byte32, + pub peer_id: PeerId, +} diff --git a/sync/src/synchronizer/block_process.rs b/sync/src/synchronizer/block_process.rs index b8fc6b5824e..3c58c54a4fb 100644 --- a/sync/src/synchronizer/block_process.rs +++ b/sync/src/synchronizer/block_process.rs @@ -32,7 +32,10 @@ impl<'a> BlockProcess<'a> { let shared = self.synchronizer.shared(); if shared.new_block_received(&block) { - if let Err(err) = self.synchronizer.process_new_block(block.clone()) { + let (this_block_verify_result, maliformed_peers) = + self.synchronizer.process_new_block(block.clone()); + + if let Err(err) = this_block_verify_result { if !is_internal_db_error(&err) { return StatusCode::BlockIsInvalid.with_context(format!( "{}, error: {}", diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 856b0acfde7..571b9f810f7 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -38,7 +38,7 @@ use ckb_network::{ async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, SupportProtocols, }; -use ckb_shared::types::HeaderIndexView; +use ckb_shared::types::{HeaderIndexView, VerifyFailedBlockInfo}; use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread}; use ckb_systemtime::unix_time_as_millis; use ckb_types::{ @@ -295,6 +295,16 @@ impl Synchronizer { let item_bytes = message.as_slice().len() as u64; let status = self.try_process(nc, peer, message); + Self::post_sync_process(nc, peer, item_name, item_bytes, status); + } + + fn post_sync_process( + nc: &dyn CKBProtocolContext, + peer: PeerIndex, + item_name: &str, + item_bytes: u64, + status: Status, + ) { metric_ckb_message_bytes( MetricDirection::In, &SupportProtocols::Sync.name(), @@ -340,14 +350,17 @@ impl Synchronizer { /// Process a new block sync from other peer //TODO: process block which we don't request - pub fn process_new_block(&self, block: core::BlockView) -> Result { + pub fn process_new_block( + &self, + block: core::BlockView, + ) -> (Result, Vec) { let block_hash = block.hash(); let status = self.shared.active_chain().get_block_status(&block_hash); // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding // stopping synchronization even when orphan_pool maintains dirty items by bugs. if status.contains(BlockStatus::BLOCK_PARTIAL_STORED) { error!("block {} already partial stored", block_hash); - Ok(false) + (Ok(false), Vec::new()) } else if status.contains(BlockStatus::HEADER_VALID) { self.shared.insert_new_block(&self.chain, Arc::new(block)) } else { @@ -356,7 +369,7 @@ impl Synchronizer { status, block_hash, ); // TODO which error should we return? - Ok(false) + (Ok(false), Vec::new()) } } diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 966e4e096b0..db2b9263337 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -14,7 +14,8 @@ use ckb_constant::sync::{ }; use ckb_error::Error as CKBError; use ckb_logger::{debug, error, trace}; -use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols}; +use ckb_network::{CKBProtocolContext, PeerId, PeerIndex, SupportProtocols}; +use ckb_shared::types::VerifyFailedBlockInfo; use ckb_shared::{ block_status::BlockStatus, shared::Shared, @@ -1085,7 +1086,7 @@ impl SyncShared { &self, chain: &ChainController, block: Arc, - ) -> Result { + ) -> (Result, Vec) { // Insert the given block into orphan_block_pool if its parent is not found // if !self.is_stored(&block.parent_hash()) { // debug!( @@ -1166,7 +1167,7 @@ impl SyncShared { &self, chain: &ChainController, block: Arc, - ) -> Result { + ) -> (Result, Vec) { let ret = { let mut assume_valid_target = self.state.assume_valid_target(); if let Some(ref target) = *assume_valid_target { @@ -1183,23 +1184,14 @@ impl SyncShared { chain.process_block(Arc::clone(&block)) } }; + if let Err(ref error) = ret { if !is_internal_db_error(error) { error!("accept block {:?} {}", block, error); self.shared() .insert_block_status(block.header().hash(), BlockStatus::BLOCK_INVALID); } - } else { - // Clear the newly inserted block from block_status_map. - // - // We don't know whether the actual block status is BLOCK_VALID or BLOCK_INVALID. - // So we just simply remove the corresponding in-memory block status, - // and the next time `get_block_status` would acquire the real-time - // status via fetching block_ext from the database. - // self.shared().remove_block_status(&block.as_ref().hash()); - // self.shared().remove_header_view(&block.as_ref().hash()); } - ret }