Skip to content

Commit

Permalink
Return malformed_peers from ckb-chain to ckb-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
eval-exec committed Sep 4, 2023
1 parent a4f23d4 commit a86ed45
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 40 additions & 14 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use ckb_logger::{
self, debug, error, info, log_enabled, log_enabled_target, trace, trace_target, warn,
};
use ckb_merkle_mountain_range::leaf_index_to_mmr_size;
use ckb_network::PeerId;
use ckb_proposal_table::ProposalTable;
#[cfg(debug_assertions)]
use ckb_rust_unstable_port::IsSorted;
use ckb_shared::block_status::BlockStatus;
use ckb_shared::shared::Shared;
use ckb_shared::types::VerifyFailedBlockInfo;
use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction};
use ckb_systemtime::unix_time_as_millis;
Expand Down Expand Up @@ -83,7 +85,10 @@ impl ChainController {
/// If the block already exists, does nothing and false is returned.
///
/// [BlockVerifier] [NonContextualBlockTxsVerifier] [ContextualBlockVerifier] will performed
pub fn process_block(&self, block: Arc<BlockView>) -> Result<bool, Error> {
pub fn process_block(
&self,
block: Arc<BlockView>,
) -> (Result<bool, Error>, Vec<VerifyFailedBlockInfo>) {
self.internal_process_block(block, Switch::NONE)
}

Expand All @@ -94,7 +99,7 @@ impl ChainController {
&self,
block: Arc<BlockView>,
switch: Switch,
) -> Result<bool, Error> {
) -> (Result<bool, Error>, Vec<VerifyFailedBlockInfo>) {
Request::call(&self.process_block_sender, (block, switch)).unwrap_or_else(|| {
Err(InternalErrorKind::System
.other("Chain service has gone")
Expand Down Expand Up @@ -248,12 +253,16 @@ pub struct ChainService {

unverified_tx: Sender<UnverifiedBlock>,
unverified_rx: Receiver<UnverifiedBlock>,

verify_failed_blocks_tx: Sender<VerifyFailedBlockInfo>,
verify_failed_blocks_rx: Receiver<VerifyFailedBlockInfo>,
}

#[derive(Clone)]
struct UnverifiedBlock {
block: Arc<BlockView>,
parent_header: HeaderView,
peer_id: PeerId,
switch: Switch,
}

Expand All @@ -266,6 +275,8 @@ impl ChainService {
let (new_block_tx, new_block_rx) =
channel::bounded::<(Arc<BlockView>, Switch)>(BLOCK_DOWNLOAD_WINDOW as usize);

let (verify_failed_blocks_tx, verify_failed_blocks_rx) = channel::unbounded();

ChainService {
shared,
proposal_table: Arc::new(Mutex::new(proposal_table)),
Expand All @@ -274,6 +285,8 @@ impl ChainService {
unverified_rx,
new_block_tx,
new_block_rx,
verify_failed_blocks_tx,
verify_failed_blocks_rx,
}
}

Expand Down Expand Up @@ -409,9 +422,18 @@ impl ChainService {
unverified_block.block.hash(),
err
);
// TODO punish the peer who give me the bad block
if let Err(SendError(peer_id)) =
self.verify_failed_blocks_tx.send(VerifyFailedBlockInfo {
block_hash: unverified_block.block.hash(),
peer_id: unverified_block.peer_id,
})
{
error!(
"send verify_failed_blocks_tx failed for peer: {:?}",
unverified_block.peer_id
);
}

// TODO decrease unverified_tip
let tip = self
.shared
.store()
Expand Down Expand Up @@ -496,6 +518,7 @@ impl ChainService {
block: descendant.to_owned(),
parent_header,
switch,
peer_id,
}) {
Ok(_) => {}
Err(err) => error!("send unverified_tx failed: {}", err),
Expand Down Expand Up @@ -642,23 +665,26 @@ impl ChainService {

// make block IO and verify asynchronize
#[doc(hidden)]
pub fn process_block_v2(&self, block: Arc<BlockView>, switch: Switch) -> Result<bool, Error> {
pub fn process_block_v2(
&self,
block: Arc<BlockView>,
switch: Switch,
) -> (Result<bool, Error>, Vec<VerifyFailedBlockInfo>) {
let block_number = block.number();
let block_hash = block.hash();
if block_number < 1 {
warn!("receive 0 number block: 0-{}", block_hash);
}

// if self
// .shared
// .contains_block_status(&block_hash, BlockStatus::BLOCK_RECEIVED)
// {
// debug!("block {}-{} has been stored", block_number, block_hash);
// return Ok(false);
// }
let failed_blocks_peer_ids: Vec<VerifyFailedBlockInfo> =
self.verify_failed_blocks_rx.iter().collect();

if !switch.disable_non_contextual() {
self.non_contextual_verify(&block)?;
let result = self.non_contextual_verify(&block);
match result {
Err(err) => return (Err(err), failed_blocks_peer_ids),
_ => {}
}
}

match self.new_block_tx.send((block, switch)) {
Expand All @@ -676,7 +702,7 @@ impl ChainService {
self.shared.get_unverified_tip().number(),
);

Ok(false)
(Ok(false), failed_blocks_peer_ids)
}

fn accept_block(&self, block: Arc<BlockView>) -> Result<Option<(HeaderView, U256)>, Error> {
Expand Down
1 change: 1 addition & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ckb-async-runtime = { path = "../util/runtime", version = "= 0.112.0-pre" }
ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.112.0-pre" }
ckb-constant = { path = "../util/constant", version = "= 0.112.0-pre" }
ckb-systemtime = { path = "../util/systemtime", version = "= 0.112.0-pre" }
ckb-network = { path = "../network", version = "= 0.112.0-pre" }
ckb-util = { path = "../util", version = "= 0.112.0-pre" }
bitflags = "1.0"
tokio = { version = "1", features = ["sync"] }
Expand Down
6 changes: 6 additions & 0 deletions shared/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ckb_network::PeerId;
use ckb_types::core::{BlockNumber, EpochNumberWithFraction};
use ckb_types::packed::Byte32;
use ckb_types::prelude::{Entity, FromSliceShouldBeOk, Reader};
Expand Down Expand Up @@ -304,3 +305,8 @@ fn get_skip_height(height: BlockNumber) -> BlockNumber {
}

pub const SHRINK_THRESHOLD: usize = 300;

pub struct VerifyFailedBlockInfo {
pub block_hash: Byte32,
pub peer_id: PeerId,
}
5 changes: 4 additions & 1 deletion sync/src/synchronizer/block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ impl<'a> BlockProcess<'a> {
let shared = self.synchronizer.shared();

if shared.new_block_received(&block) {
if let Err(err) = self.synchronizer.process_new_block(block.clone()) {
let (this_block_verify_result, maliformed_peers) =
self.synchronizer.process_new_block(block.clone());

if let Err(err) = this_block_verify_result {
if !is_internal_db_error(&err) {
return StatusCode::BlockIsInvalid.with_context(format!(
"{}, error: {}",
Expand Down
21 changes: 17 additions & 4 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use ckb_network::{
async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex,
ServiceControl, SupportProtocols,
};
use ckb_shared::types::HeaderIndexView;
use ckb_shared::types::{HeaderIndexView, VerifyFailedBlockInfo};
use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{
Expand Down Expand Up @@ -295,6 +295,16 @@ impl Synchronizer {
let item_bytes = message.as_slice().len() as u64;
let status = self.try_process(nc, peer, message);

Self::post_sync_process(nc, peer, item_name, item_bytes, status);
}

fn post_sync_process(
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
item_name: &str,
item_bytes: u64,
status: Status,
) {
metric_ckb_message_bytes(
MetricDirection::In,
&SupportProtocols::Sync.name(),
Expand Down Expand Up @@ -340,14 +350,17 @@ impl Synchronizer {

/// Process a new block sync from other peer
//TODO: process block which we don't request
pub fn process_new_block(&self, block: core::BlockView) -> Result<bool, CKBError> {
pub fn process_new_block(
&self,
block: core::BlockView,
) -> (Result<bool, CKBError>, Vec<VerifyFailedBlockInfo>) {
let block_hash = block.hash();
let status = self.shared.active_chain().get_block_status(&block_hash);
// NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
if status.contains(BlockStatus::BLOCK_PARTIAL_STORED) {
error!("block {} already partial stored", block_hash);
Ok(false)
(Ok(false), Vec::new())
} else if status.contains(BlockStatus::HEADER_VALID) {
self.shared.insert_new_block(&self.chain, Arc::new(block))
} else {
Expand All @@ -356,7 +369,7 @@ impl Synchronizer {
status, block_hash,
);
// TODO which error should we return?
Ok(false)
(Ok(false), Vec::new())
}
}

Expand Down
18 changes: 5 additions & 13 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use ckb_constant::sync::{
};
use ckb_error::Error as CKBError;
use ckb_logger::{debug, error, trace};
use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
use ckb_network::{CKBProtocolContext, PeerId, PeerIndex, SupportProtocols};
use ckb_shared::types::VerifyFailedBlockInfo;
use ckb_shared::{
block_status::BlockStatus,
shared::Shared,
Expand Down Expand Up @@ -1085,7 +1086,7 @@ impl SyncShared {
&self,
chain: &ChainController,
block: Arc<core::BlockView>,
) -> Result<bool, CKBError> {
) -> (Result<bool, CKBError>, Vec<VerifyFailedBlockInfo>) {
// Insert the given block into orphan_block_pool if its parent is not found
// if !self.is_stored(&block.parent_hash()) {
// debug!(
Expand Down Expand Up @@ -1166,7 +1167,7 @@ impl SyncShared {
&self,
chain: &ChainController,
block: Arc<core::BlockView>,
) -> Result<bool, CKBError> {
) -> (Result<bool, CKBError>, Vec<VerifyFailedBlockInfo>) {
let ret = {
let mut assume_valid_target = self.state.assume_valid_target();
if let Some(ref target) = *assume_valid_target {
Expand All @@ -1183,23 +1184,14 @@ impl SyncShared {
chain.process_block(Arc::clone(&block))
}
};

if let Err(ref error) = ret {
if !is_internal_db_error(error) {
error!("accept block {:?} {}", block, error);
self.shared()
.insert_block_status(block.header().hash(), BlockStatus::BLOCK_INVALID);
}
} else {
// Clear the newly inserted block from block_status_map.
//
// We don't know whether the actual block status is BLOCK_VALID or BLOCK_INVALID.
// So we just simply remove the corresponding in-memory block status,
// and the next time `get_block_status` would acquire the real-time
// status via fetching block_ext from the database.
// self.shared().remove_block_status(&block.as_ref().hash());
// self.shared().remove_header_view(&block.as_ref().hash());
}

ret
}

Expand Down

0 comments on commit a86ed45

Please sign in to comment.