Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bans for bad incoming blocks #5934

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions applications/minotari_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ where B: BlockchainBackend + 'static
self.rules.clone(),
base_node_config.messaging_request_timeout,
self.randomx_factory.clone(),
base_node_config.state_machine.clone(),
))
.add_initializer(MempoolServiceInitializer::new(
self.mempool.clone(),
Expand Down
2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/comms_interface/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ pub enum CommsInterfaceError {
InternalError(String),
#[error("API responded with an error: {0}")]
ApiError(String),
#[error("Header not found at {0}")]
BlockHeaderNotFound(u64),
#[error("Block error: {0}")]
BlockError(#[from] BlockError),
#[error("Invalid request for {request}: {details}")]
Expand Down
67 changes: 25 additions & 42 deletions base_layer/core/src/base_node/service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use tari_comms_dht::outbound::DhtOutboundError;
use thiserror::Error;

use crate::{
base_node::{comms_interface::CommsInterfaceError, service::initializer::ExtractBlockError},
common::BanReason,
common::{BanPeriod, BanReason},
};

#[derive(Debug, Error)]
Expand All @@ -48,58 +46,43 @@ impl BaseNodeServiceError {
pub fn get_ban_reason(&self) -> Option<BanReason> {
match self {
BaseNodeServiceError::CommsInterfaceError(comms) => match comms {
CommsInterfaceError::UnexpectedApiResponse => Some(BanReason {
reason: "Unexpected API response".to_string(),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::RequestTimedOut => Some(BanReason {
reason: "Request timed out".to_string(),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidPeerResponse(e) => Some(BanReason {
reason: format!("Invalid peer response: {}", e),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidBlockHeader(e) => Some(BanReason {
reason: format!("Invalid block header: {}", e),
ban_duration: Duration::from_secs(60),
err @ CommsInterfaceError::UnexpectedApiResponse | err @ CommsInterfaceError::RequestTimedOut => {
Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Short,
})
},
err @ CommsInterfaceError::InvalidPeerResponse(_) |
SWvheerden marked this conversation as resolved.
Show resolved Hide resolved
err @ CommsInterfaceError::InvalidBlockHeader(_) |
err @ CommsInterfaceError::TransactionError(_) |
err @ CommsInterfaceError::InvalidFullBlock { .. } |
err @ CommsInterfaceError::InvalidRequest { .. } => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
CommsInterfaceError::TransactionError(e) => Some(BanReason {
reason: format!("Invalid transaction: {}", e),
ban_duration: Duration::from_secs(60),
}),
CommsInterfaceError::InvalidRequest { request, details } => Some(BanReason {
reason: format!("Invalid request: {} ({})", request, details),
ban_duration: Duration::from_secs(60),
CommsInterfaceError::MempoolError(e) => e.get_ban_reason(),
CommsInterfaceError::TransportChannelError(e) => Some(BanReason {
reason: e.to_string(),
ban_duration: BanPeriod::Short,
}),
CommsInterfaceError::ChainStorageError(e) => e.get_ban_reason(),
CommsInterfaceError::MergeMineError(e) => e.get_ban_reason(),
CommsInterfaceError::NoBootstrapNodesConfigured |
CommsInterfaceError::TransportChannelError(_) |
CommsInterfaceError::ChainStorageError(_) |
CommsInterfaceError::OutboundMessageError(_) |
CommsInterfaceError::MempoolError(_) |
CommsInterfaceError::BroadcastFailed |
CommsInterfaceError::InternalChannelError(_) |
CommsInterfaceError::DifficultyAdjustmentManagerError(_) |
CommsInterfaceError::InternalError(_) |
CommsInterfaceError::ApiError(_) |
CommsInterfaceError::BlockHeaderNotFound(_) |
CommsInterfaceError::BlockError(_) |
CommsInterfaceError::InvalidFullBlock { .. } |
CommsInterfaceError::MergeMineError(_) |
CommsInterfaceError::DifficultyError(_) => None,
},
BaseNodeServiceError::DhtOutboundError(_) => None,
BaseNodeServiceError::InvalidRequest(e) => Some(BanReason {
reason: format!("Invalid request: {}", e),
ban_duration: Duration::from_secs(60),
}),
BaseNodeServiceError::InvalidResponse(e) => Some(BanReason {
reason: format!("Invalid response: {}", e),
ban_duration: Duration::from_secs(60),
}),
BaseNodeServiceError::InvalidBlockMessage(e) => Some(BanReason {
reason: format!("Invalid block message: {}", e),
ban_duration: Duration::from_secs(60),
err @ BaseNodeServiceError::InvalidRequest(_) |
err @ BaseNodeServiceError::InvalidResponse(_) |
err @ BaseNodeServiceError::InvalidBlockMessage(_) => Some(BanReason {
reason: err.to_string(),
ban_duration: BanPeriod::Long,
}),
}
}
Expand Down
6 changes: 6 additions & 0 deletions base_layer/core/src/base_node/service/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
base_node::{
comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface},
service::service::{BaseNodeService, BaseNodeStreams},
BaseNodeStateMachineConfig,
StateMachineHandle,
},
blocks::NewBlock,
Expand All @@ -68,6 +69,7 @@ pub struct BaseNodeServiceInitializer<T> {
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
base_node_config: BaseNodeStateMachineConfig,
}

impl<T> BaseNodeServiceInitializer<T>
Expand All @@ -81,6 +83,7 @@ where T: BlockchainBackend
consensus_manager: ConsensusManager,
service_request_timeout: Duration,
randomx_factory: RandomXFactory,
base_node_config: BaseNodeStateMachineConfig,
) -> Self {
Self {
inbound_message_subscription_factory,
Expand All @@ -89,6 +92,7 @@ where T: BlockchainBackend
consensus_manager,
service_request_timeout,
randomx_factory,
base_node_config,
}
}

Expand Down Expand Up @@ -180,6 +184,7 @@ where T: BlockchainBackend + 'static
let mempool = self.mempool.clone();
let consensus_manager = self.consensus_manager.clone();
let randomx_factory = self.randomx_factory.clone();
let config = self.base_node_config.clone();

context.spawn_when_ready(move |handles| async move {
let dht = handles.expect_handle::<Dht>();
Expand Down Expand Up @@ -213,6 +218,7 @@ where T: BlockchainBackend + 'static
service_request_timeout,
state_machine,
connectivity,
config,
)
.start(streams);
futures::pin_mut!(service);
Expand Down
49 changes: 37 additions & 12 deletions base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ use crate::{
comms_interface::{CommsInterfaceError, InboundNodeCommsHandlers, NodeCommsRequest, NodeCommsResponse},
service::{error::BaseNodeServiceError, initializer::ExtractBlockError},
state_machine_service::states::StateInfo,
BaseNodeStateMachineConfig,
StateMachineHandle,
},
blocks::{Block, NewBlock},
chain_storage::{BlockchainBackend, ChainStorageError},
common::BanPeriod,
proto as shared_protos,
proto::base_node as proto,
};
Expand Down Expand Up @@ -97,6 +99,7 @@ pub(super) struct BaseNodeService<B> {
service_request_timeout: Duration,
state_machine_handle: StateMachineHandle,
connectivity: ConnectivityRequester,
base_node_config: BaseNodeStateMachineConfig,
}

impl<B> BaseNodeService<B>
Expand All @@ -108,6 +111,7 @@ where B: BlockchainBackend + 'static
service_request_timeout: Duration,
state_machine_handle: StateMachineHandle,
connectivity: ConnectivityRequester,
base_node_config: BaseNodeStateMachineConfig,
) -> Self {
let (timeout_sender, timeout_receiver) = mpsc::channel(100);
Self {
Expand All @@ -119,6 +123,7 @@ where B: BlockchainBackend + 'static
service_request_timeout,
state_machine_handle,
connectivity,
base_node_config,
}
}

Expand Down Expand Up @@ -256,7 +261,8 @@ where B: BlockchainBackend + 'static
let outbound_message_service = self.outbound_message_service.clone();
let state_machine_handle = self.state_machine_handle.clone();
let mut connectivity = self.connectivity.clone();

let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let result = handle_incoming_request(
inbound_nch,
Expand All @@ -267,12 +273,12 @@ where B: BlockchainBackend + 'static
.await;
if let Err(e) = result {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity
.ban_peer_until(
domain_msg.source_peer.node_id.clone(),
ban_reason.ban_duration(),
ban_reason.reason().to_string(),
)
.ban_peer_until(domain_msg.source_peer.node_id.clone(), duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
Expand All @@ -287,18 +293,21 @@ where B: BlockchainBackend + 'static
) {
let waiting_requests = self.waiting_requests.clone();
let mut connectivity_requester = self.connectivity.clone();

let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let source_peer = domain_msg.source_peer.clone();
let result = handle_incoming_response(waiting_requests, domain_msg).await;

if let Err(e) = result {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity_requester
.ban_peer_until(
source_peer.node_id,
ban_reason.ban_duration(),
ban_reason.reason().to_string(),
)
.ban_peer_until(source_peer.node_id, duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
Expand Down Expand Up @@ -334,6 +343,10 @@ where B: BlockchainBackend + 'static
return;
}
let inbound_nch = self.inbound_nch.clone();
let mut connectivity_requester = self.connectivity.clone();
let source_peer = new_block.source_peer.clone();
let short_ban = self.base_node_config.blockchain_sync_config.short_ban_period;
let long_ban = self.base_node_config.blockchain_sync_config.ban_period;
task::spawn(async move {
let result = handle_incoming_block(inbound_nch, new_block).await;

Expand All @@ -344,7 +357,19 @@ where B: BlockchainBackend + 'static
))) => {
// Special case, dont log this again as an error
},
Err(e) => error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e),
Err(e) => {
if let Some(ban_reason) = e.get_ban_reason() {
let duration = match ban_reason.ban_duration {
BanPeriod::Short => short_ban,
BanPeriod::Long => long_ban,
};
let _drop = connectivity_requester
.ban_peer_until(source_peer.node_id, duration, ban_reason.reason)
.await
.map_err(|e| error!(target: LOG_TARGET, "Failed to ban peer: {:?}", e));
}
error!(target: LOG_TARGET, "Failed to handle incoming block message: {}", e)
},
}
});
}
Expand Down
42 changes: 21 additions & 21 deletions base_layer/core/src/base_node/sync/ban.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

use log::*;
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};

use crate::{base_node::BlockchainSyncConfig, common::BanReason};
use crate::base_node::BlockchainSyncConfig;

const LOG_TARGET: &str = "c::bn::sync";

Expand All @@ -39,27 +41,25 @@ impl PeerBanManager {
Self { config, connectivity }
}

pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: &Option<BanReason>) {
if let Some(ban) = ban_reason {
if self.config.forced_sync_peers.contains(node_id) {
debug!(
target: LOG_TARGET,
"Not banning peer that is on the allow list for sync. Ban reason = {}", ban.reason()
);
return;
}
debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban.reason());
pub async fn ban_peer_if_required(&mut self, node_id: &NodeId, ban_reason: String, ban_duration: Duration) {
if self.config.forced_sync_peers.contains(node_id) {
debug!(
target: LOG_TARGET,
"Not banning peer that is on the allow list for sync. Ban reason = {}", ban_reason
);
return;
}
debug!(target: LOG_TARGET, "Sync peer {} removed from the sync peer list because {}", node_id, ban_reason);

match self
.connectivity
.ban_peer_until(node_id.clone(), ban.ban_duration, ban.reason().to_string())
.await
{
Ok(_) => {
warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban.ban_duration, ban.reason())
},
Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err),
}
match self
.connectivity
.ban_peer_until(node_id.clone(), ban_duration, ban_reason.clone())
.await
{
Ok(_) => {
warn!(target: LOG_TARGET, "Banned sync peer {} for {:?} because {}", node_id, ban_duration, ban_reason)
},
Err(err) => error!(target: LOG_TARGET, "Failed to ban sync peer {}: {}", node_id, err),
}
}
}
Loading
Loading