Skip to content

Commit

Permalink
feat: bans for bad incoming blocks (#5934)
Browse files Browse the repository at this point in the history
Description
---
Bans peers who send bad incoming blocks. 
Consolidates banning a bit more. 

Motivation and Context
---
Created an issue to fix the ban period in the config:
#5933

Fixes: #5795

---------

Co-authored-by: stringhandler <[email protected]>
  • Loading branch information
SWvheerden and stringhandler authored Nov 13, 2023
1 parent fa2fb27 commit 7acc44d
Show file tree
Hide file tree
Showing 20 changed files with 286 additions and 133 deletions.
1 change: 1 addition & 0 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ where B: BlockchainBackend + 'static
self.rules.clone(),
base_node_config.messaging_request_timeout,
self.randomx_factory.clone(),
base_node_config.state_machine.clone(),
))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
Expand Down
2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ pub enum CommsInterfaceError {
InternalError(String),
#[error("API responded with an error: {0}")]
ApiError(String),
#[error("Header not found at {0}")]
BlockHeaderNotFound(u64),
#[error("Block error: {0}")]
BlockError(#[from] BlockError),
#[error("Invalid request for {request}: {details}")]
Expand Down
67 changes: 25 additions & 42 deletions base_layer/core/src/base_node/service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use tari_comms_dht::outbound::DhtOutboundError;
use thiserror::Error;

use crate::{
base_node::{comms_interface::CommsInterfaceError, service::initializer::ExtractBlockError},
common::BanReason,
common::{BanPeriod, BanReason},
};

#[derive(Debug, Error)]
Expand All @@ -48,58 +46,43 @@ impl BaseNodeServiceError {
pub fn get_ban_reason(&self) -> Option<BanReason> {
match self {
BaseNodeServiceError::CommsInterfaceError(comms) => match comms {
CommsInterfaceError::UnexpectedApiResponse => Some(BanReason {
reason: "Unexpected API response".to_string(),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::RequestTimedOut => Some(BanReason {
reason: "Request timed out".to_string(),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidPeerResponse(e) => Some(BanReason {
reason: format!("Invalid peer response: {}", e),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidBlockHeader(e) => Some(BanReason {
reason: format!("Invalid block header: {}", e),
ban_duration: Duration::from_secs(60),
err @ CommsInterfaceError::UnexpectedApiResponse | err @ CommsInterfaceError::RequestTimedOut => {
Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Short,
})
},
err @ CommsInterfaceError::InvalidPeerResponse(_) |
err @ CommsInterfaceError::InvalidBlockHeader(_) |
err @ CommsInterfaceError::TransactionError(_) |
err @ CommsInterfaceError::InvalidFullBlock { .. } |
err @ CommsInterfaceError::InvalidRequest { .. } => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
CommsInterfaceError::TransactionError(e) => Some(BanReason {
reason: format!("Invalid transaction: {}", e),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidRequest { request, details } => Some(BanReason {
reason: format!("Invalid request: {} ({})", request, details),
ban_duration: Duration::from_secs(60),
CommsInterfaceError::MempoolError(e) => e.get_ban_reason(),
CommsInterfaceError::TransportChannelError(e) => Some(BanReason {
reason: e.to_string(),
ban_duration: BanPeriod::Short,
}),
CommsInterfaceError::ChainStorageError(e) => e.get_ban_reason(),
CommsInterfaceError::MergeMineError(e) => e.get_ban_reason(),
CommsInterfaceError::NoBootstrapNodesConfigured |
CommsInterfaceError::TransportChannelError(_) |
CommsInterfaceError::ChainStorageError(_) |
CommsInterfaceError::OutboundMessageError(_) |
CommsInterfaceError::MempoolError(_) |
CommsInterfaceError::BroadcastFailed |
CommsInterfaceError::InternalChannelError(_) |
CommsInterfaceError::DifficultyAdjustmentManagerError(_) |
CommsInterfaceError::InternalError(_) |
CommsInterfaceError::ApiError(_) |
CommsInterfaceError::BlockHeaderNotFound(_) |
CommsInterfaceError::BlockError(_) |
CommsInterfaceError::InvalidFullBlock { .. } |
CommsInterfaceError::MergeMineError(_) |
CommsInterfaceError::DifficultyError(_) => None,
},
BaseNodeServiceError::DhtOutboundError(_) => None,
BaseNodeServiceError::InvalidRequest(e) => Some(BanReason {
reason: format!("Invalid request: {}", e),
ban_duration: Duration::from_secs(60),
}),
BaseNodeServiceError::InvalidResponse(e) => Some(BanReason {
reason: format!("Invalid response: {}", e),
ban_duration: Duration::from_secs(60),
}),
BaseNodeServiceError::InvalidBlockMessage(e) => Some(BanReason {
reason: format!("Invalid block message: {}", e),
ban_duration: Duration::from_secs(60),
err @ BaseNodeServiceError::InvalidRequest(_) |
err @ BaseNodeServiceError::InvalidResponse(_) |
err @ BaseNodeServiceError::InvalidBlockMessage(_) => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
}
}
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
base_node::{
comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface},
service::service::{BaseNodeService, BaseNodeStreams},
BaseNodeStateMachineConfig,
StateMachineHandle,
},
blocks::NewBlock,
Expand All @@ -68,6 +69,7 @@ pub struct BaseNodeServiceInitializer<T> {
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
base_node_config: BaseNodeStateMachineConfig,
}

impl<T> BaseNodeServiceInitializer<T>
Expand All @@ -81,6 +83,7 @@ where T: BlockchainBackend
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
base_node_config: BaseNodeStateMachineConfig,
) -> Self {
Self {
inbound_message_subscription_factory,
Expand All @@ -89,6 +92,7 @@ where T: BlockchainBackend
consensus_manager,
service_request_timeout,
randomx_factory,
base_node_config,
}
}

Expand Down Expand Up @@ -180,6 +184,7 @@ where T: BlockchainBackend + 'static
let mempool = self.mempool.clone();
let consensus_manager = self.consensus_manager.clone();
let randomx_factory = self.randomx_factory.clone();
let config = self.base_node_config.clone();

context.spawn_when_ready(move |handles| async move {
let dht = handles.expect_handle::<Dht>();
Expand Down Expand Up @@ -213,6 +218,7 @@ where T: BlockchainBackend + 'static
service_request_timeout,
state_machine,
connectivity,
config,
)
.start(streams);
futures::pin_mut!(service);
Expand Down
49 changes: 37 additions & 12 deletions base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ use crate::{
comms_interface::{CommsInterfaceError, InboundNodeCommsHandlers, NodeCommsRequest, NodeCommsResponse},
service::{error::BaseNodeServiceError, initializer::ExtractBlockError},
state_machine_service::states::StateInfo,
BaseNodeStateMachineConfig,
StateMachineHandle,
},
blocks::{Block, NewBlock},
chain_storage::{BlockchainBackend, ChainStorageError},
common::BanPeriod,
proto as shared_protos,
proto::base_node as proto,
};
Expand Down Expand Up @@ -97,6 +99,7 @@ pub(super) struct BaseNodeService<B> {
service_request_timeout: Duration,
state_machine_handle: StateMachineHandle,
connectivity: ConnectivityRequester,
base_node_config: BaseNodeStateMachineConfig,
}

impl<B> BaseNodeService<B>
Expand All @@ -108,6 +111,7 @@ where B: BlockchainBackend + 'static
service_request_timeout: Duration,
state_machine_handle: StateMachineHandle,
connectivity: ConnectivityRequester,
base_node_config: BaseNodeStateMachineConfig,
) -> Self {
let (timeout_sender, timeout_receiver) = mpsc::channel(100);
Self {
Expand All @@ -119,6 +123,7 @@ where B: BlockchainBackend + 'static
service_request_timeout,
state_machine_handle,
connectivity,
base_node_config,
}
}

Expand Down Expand Up @@ -256,7 +261,8 @@ where B: BlockchainBackend + 'static
let outbound_message_service = self.outbound_message_service.clone();
let state_machine_handle = self.state_machine_handle.clone();
let mut connectivity = self.connectivity.clone();

let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let result = handle_incoming_request(
inbound_nch,
Expand All @@ -267,12 +273,12 @@ where B: BlockchainBackend + 'static
.await;
if let Err(e) = result {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity
.ban_peer_until(
domain_msg.source_peer.node_id.clone(),
ban_reason.ban_duration(),
ban_reason.reason().to_string(),
)
.ban_peer_until(domain_msg.source_peer.node_id.clone(), duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
Expand All @@ -287,18 +293,21 @@ where B: BlockchainBackend + 'static
) {
let waiting_requests = self.waiting_requests.clone();
let mut connectivity_requester = self.connectivity.clone();

let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let source_peer = domain_msg.source_peer.clone();
let result = handle_incoming_response(waiting_requests, domain_msg).await;

if let Err(e) = result {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity_requester
.ban_peer_until(
source_peer.node_id,
ban_reason.ban_duration(),
ban_reason.reason().to_string(),
)
.ban_peer_until(source_peer.node_id, duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
Expand Down Expand Up @@ -334,6 +343,10 @@ where B: BlockchainBackend + 'static
return;
}
let inbound_nch = self.inbound_nch.clone();
let mut connectivity_requester = self.connectivity.clone();
let source_peer = new_block.source_peer.clone();
let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let result = handle_incoming_block(inbound_nch, new_block).await;

Expand All @@ -344,7 +357,19 @@ where B: BlockchainBackend + 'static
))) => {
// Special case, dont log this again as an error
},
Err(e) => error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e),
Err(e) => {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity_requester
.ban_peer_until(source_peer.node_id, duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e)
},
}
});
}
Expand Down
42 changes: 21 additions & 21 deletions base_layer/core/src/base_node/sync/ban.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use log::*;
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};

use crate::{base_node::BlockchainSyncConfig, common::BanReason};
use crate::base_node::BlockchainSyncConfig;

const LOG_TARGET: &str = "c::bn::sync";

Expand All @@ -39,27 +41,25 @@ impl PeerBanManager {
Self { config, connectivity }
}

pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: &Option<BanReason>) {
if let Some(ban) = ban_reason {
if self.config.forced_sync_peers.contains(node_id) {
debug!(
target: LOG_TARGET,
"Not banning peer that is on the allow list for sync. Ban reason = {}", ban.reason()
);
return;
}
debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban.reason());
pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: String, ban_duration: Duration) {
if self.config.forced_sync_peers.contains(node_id) {
debug!(
target: LOG_TARGET,
"Not banning peer that is on the allow list for sync. Ban reason = {}", ban_reason
);
return;
}
debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban_reason);

match self
.connectivity
.ban_peer_until(node_id.clone(), ban.ban_duration, ban.reason().to_string())
.await
{
Ok(_) => {
warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban.ban_duration, ban.reason())
},
Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err),
}
match self
.connectivity
.ban_peer_until(node_id.clone(), ban_duration, ban_reason.clone())
.await
{
Ok(_) => {
warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban_duration, ban_reason)
},
Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err),
}
}
}
Loading

0 comments on commit 7acc44d

Please sign in to comment.