Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: node gets banned on reorg #4949

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions base_layer/core/src/base_node/comms_interface/comms_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,20 @@ pub enum NodeCommsRequest {
FetchHeaders(RangeInclusive<u64>),
FetchHeadersByHashes(Vec<HashOutput>),
FetchMatchingUtxos(Vec<HashOutput>),
FetchMatchingBlocks {
range: RangeInclusive<u64>,
compact: bool,
},
FetchBlocksByHash {
block_hashes: Vec<HashOutput>,
compact: bool,
},
FetchMatchingBlocks { range: RangeInclusive<u64>, compact: bool },
FetchBlocksByKernelExcessSigs(Vec<Signature>),
FetchBlocksByUtxos(Vec<Commitment>),
GetHeaderByHash(HashOutput),
GetBlockByHash(HashOutput),
GetNewBlockTemplate(GetNewBlockTemplateRequest),
GetNewBlock(NewBlockTemplate),
GetBlockFromAllChains(HashOutput),
FetchKernelByExcessSig(Signature),
FetchMempoolTransactionsByExcessSigs {
excess_sigs: Vec<PrivateKey>,
},
FetchValidatorNodesKeys {
height: u64,
},
GetShardKey {
height: u64,
public_key: PublicKey,
},
FetchTemplateRegistrations {
start_height: u64,
end_height: u64,
},
FetchUnspentUtxosInBlock {
block_hash: BlockHash,
},
FetchMempoolTransactionsByExcessSigs { excess_sigs: Vec<PrivateKey> },
FetchValidatorNodesKeys { height: u64 },
GetShardKey { height: u64, public_key: PublicKey },
FetchTemplateRegistrations { start_height: u64, end_height: u64 },
FetchUnspentUtxosInBlock { block_hash: BlockHash },
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -100,15 +82,13 @@ impl Display for NodeCommsRequest {
FetchMatchingBlocks { range, compact } => {
write!(f, "FetchMatchingBlocks ({:?}, {})", range, compact)
},
FetchBlocksByHash { block_hashes, compact } => {
write!(f, "FetchBlocksByHash (n={}, {})", block_hashes.len(), compact)
},
FetchBlocksByKernelExcessSigs(v) => write!(f, "FetchBlocksByKernelExcessSigs (n={})", v.len()),
FetchBlocksByUtxos(v) => write!(f, "FetchBlocksByUtxos (n={})", v.len()),
GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()),
GetBlockByHash(v) => write!(f, "GetBlockByHash({})", v.to_hex()),
GetNewBlockTemplate(v) => write!(f, "GetNewBlockTemplate ({}) with weight {}", v.algo, v.max_weight),
GetNewBlock(b) => write!(f, "GetNewBlock (Block Height={})", b.header.height),
GetBlockFromAllChains(v) => write!(f, "GetBlockFromAllChains({})", v.to_hex()),
FetchKernelByExcessSig(s) => write!(
f,
"FetchKernelByExcessSig (signature=({}, {}))",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum NodeCommsResponse {
TransactionKernels(Vec<TransactionKernel>),
BlockHeaders(Vec<ChainHeader>),
BlockHeader(Option<ChainHeader>),
Block(Box<Option<Block>>),
TransactionOutputs(Vec<TransactionOutput>),
HistoricalBlocks(Vec<HistoricalBlock>),
HistoricalBlock(Box<Option<HistoricalBlock>>),
Expand All @@ -70,6 +71,7 @@ impl Display for NodeCommsResponse {
TransactionKernels(_) => write!(f, "TransactionKernel"),
BlockHeaders(_) => write!(f, "BlockHeaders"),
BlockHeader(_) => write!(f, "BlockHeader"),
Block(_) => write!(f, "Block"),
HistoricalBlock(_) => write!(f, "HistoricalBlock"),
TransactionOutputs(_) => write!(f, "TransactionOutputs"),
HistoricalBlocks(_) => write!(f, "HistoricalBlocks"),
Expand Down
76 changes: 42 additions & 34 deletions base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,6 @@ where B: BlockchainBackend + 'static
let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByHash { block_hashes, compact } => {
let mut blocks = Vec::with_capacity(block_hashes.len());
for block_hash in block_hashes {
let block_hex = block_hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {} (compact = {})", block_hex, compact
);

match self.blockchain_db.fetch_block_by_hash(block_hash, compact).await {
Ok(Some(block)) => blocks.push(block),
Ok(None) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because not stored", block_hex,
),
Err(e) => warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
block_hex,
e.to_string()
),
}
}
Ok(NodeCommsResponse::HistoricalBlocks(blocks))
},
NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
return Err(CommsInterfaceError::InvalidRequest {
Expand Down Expand Up @@ -342,6 +317,43 @@ where B: BlockchainBackend + 'static
block: Some(block),
})
},
NodeCommsRequest::GetBlockFromAllChains(hash) => {
let block_hex = hash.to_hex();
debug!(
target: LOG_TARGET,
"A peer has requested a block with hash {}", block_hex
);

let maybe_block = match self
.blockchain_db
.fetch_block_by_hash(hash, true)
.await
.unwrap_or_else(|e| {
warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}",
block_hex,
e.to_string()
);

None
}) {
None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
|e| {
warn!(
target: LOG_TARGET,
"Could not provide requested block {} to peer because: {}", block_hex, e,
);

None
},
Some,
),
Some(block) => Some(block.try_into_block()?),
};

Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
},
NodeCommsRequest::FetchKernelByExcessSig(signature) => {
let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
Ok(Some((kernel, _))) => vec![kernel],
Expand Down Expand Up @@ -596,16 +608,12 @@ where B: BlockchainBackend + 'static
source_peer: NodeId,
block_hash: BlockHash,
) -> Result<Block, CommsInterfaceError> {
let mut historical_block = self
return match self
.outbound_nci
.request_blocks_by_hashes_from_peer(vec![block_hash], Some(source_peer.clone()))
.await?;

return match historical_block.pop() {
Some(block) => {
let block = block.try_into_block()?;
Ok(block)
},
.request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
.await?
{
Some(block) => Ok(block),
None => {
if let Err(e) = self
.connectivity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
NodeCommsRequest,
NodeCommsResponse,
},
blocks::{HistoricalBlock, NewBlock},
blocks::{Block, NewBlock},
};

/// The OutboundNodeCommsInterface provides an interface to request information from remove nodes.
Expand Down Expand Up @@ -60,22 +60,15 @@ impl OutboundNodeCommsInterface {
/// Fetch the Blocks corresponding to the provided block hashes from a specific base node.
pub async fn request_blocks_by_hashes_from_peer(
&mut self,
block_hashes: Vec<BlockHash>,
hash: BlockHash,
node_id: Option<NodeId>,
) -> Result<Vec<HistoricalBlock>, CommsInterfaceError> {
if let NodeCommsResponse::HistoricalBlocks(blocks) = self
) -> Result<Option<Block>, CommsInterfaceError> {
if let NodeCommsResponse::Block(block) = self
.request_sender
.call((
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
// We always request compact inputs from peer
compact: true,
},
node_id,
))
.call((NodeCommsRequest::GetBlockFromAllChains(hash), node_id))
.await??
{
Ok(blocks)
Ok(*block)
} else {
Err(CommsInterfaceError::UnexpectedApiResponse)
}
Expand Down
8 changes: 3 additions & 5 deletions base_layer/core/src/base_node/proto/request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ package tari.base_node;
message BaseNodeServiceRequest {
uint64 request_key = 1;
oneof request {
// Indicates a FetchBlocksByHash request.
FetchBlocksByHashRequest fetch_blocks_by_hash = 8;
GetBlockFromAllChainsRequest get_block_from_all_chains = 8;
ExcessSigs fetch_mempool_transactions_by_excess_sigs = 9;
}
}
Expand All @@ -27,9 +26,8 @@ message BlockHeights {
repeated uint64 heights = 1;
}

message FetchBlocksByHashRequest {
repeated bytes block_hashes = 1;
bool compact = 2;
message GetBlockFromAllChainsRequest {
bytes hash = 1;
}

message Signatures {
Expand Down
23 changes: 6 additions & 17 deletions base_layer/core/src/base_node/proto/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,10 @@ impl TryInto<NodeCommsRequest> for ProtoNodeCommsRequest {
type Error = String;

fn try_into(self) -> Result<NodeCommsRequest, Self::Error> {
use ProtoNodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs};
use ProtoNodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains};
let request = match self {
FetchBlocksByHash(req) => {
let block_hashes = req
.block_hashes
.into_iter()
.map(|hash| hash.try_into().map_err(|_| "Malformed hash".to_string()))
.collect::<Result<_, _>>()?;
NodeCommsRequest::FetchBlocksByHash {
block_hashes,
compact: req.compact,
}
GetBlockFromAllChains(req) => {
NodeCommsRequest::GetBlockFromAllChains(req.hash.try_into().map_err(|_| "Malformed hash".to_string())?)
},
FetchMempoolTransactionsByExcessSigs(excess_sigs) => {
let excess_sigs = excess_sigs
Expand All @@ -66,13 +58,10 @@ impl TryFrom<NodeCommsRequest> for ProtoNodeCommsRequest {
type Error = String;

fn try_from(request: NodeCommsRequest) -> Result<Self, Self::Error> {
use NodeCommsRequest::{FetchBlocksByHash, FetchMempoolTransactionsByExcessSigs};
use NodeCommsRequest::{FetchMempoolTransactionsByExcessSigs, GetBlockFromAllChains};
match request {
FetchBlocksByHash { block_hashes, compact } => Ok(ProtoNodeCommsRequest::FetchBlocksByHash(
proto::FetchBlocksByHashRequest {
block_hashes: block_hashes.into_iter().map(|hash| hash.to_vec()).collect(),
compact,
},
GetBlockFromAllChains(hash) => Ok(ProtoNodeCommsRequest::GetBlockFromAllChains(
proto::GetBlockFromAllChainsRequest { hash: hash.to_vec() },
)),
FetchMempoolTransactionsByExcessSigs { excess_sigs } => Ok(
ProtoNodeCommsRequest::FetchMempoolTransactionsByExcessSigs(proto::ExcessSigs {
Expand Down
5 changes: 5 additions & 0 deletions base_layer/core/src/base_node/proto/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package tari.base_node;
message BaseNodeServiceResponse {
uint64 request_key = 1;
oneof response {
BlockResponse block_response = 5;
// Indicates a HistoricalBlocks response.
HistoricalBlocks historical_blocks = 6;
FetchMempoolTransactionsResponse fetch_mempool_transactions_by_excess_sigs_response = 7;
Expand Down Expand Up @@ -44,6 +45,10 @@ message HistoricalBlocks {
repeated tari.core.HistoricalBlock blocks = 1;
}

message BlockResponse {
tari.core.Block block = 1;
}

message NewBlockResponse {
bool success = 1;
string error = 2;
Expand Down
30 changes: 28 additions & 2 deletions base_layer/core/src/base_node/proto/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ use tari_utilities::{convert::try_convert_all, ByteArray};
pub use crate::proto::base_node::base_node_service_response::Response as ProtoNodeCommsResponse;
use crate::{
base_node::comms_interface::{FetchMempoolTransactionsResponse, NodeCommsResponse},
blocks::{BlockHeader, HistoricalBlock},
blocks::{Block, BlockHeader, HistoricalBlock},
proto,
};

impl TryInto<NodeCommsResponse> for ProtoNodeCommsResponse {
type Error = String;

fn try_into(self) -> Result<NodeCommsResponse, Self::Error> {
use ProtoNodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
use ProtoNodeCommsResponse::{BlockResponse, FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
let response = match self {
BlockResponse(block) => NodeCommsResponse::Block(Box::new(block.try_into()?)),
HistoricalBlocks(blocks) => {
let blocks = try_convert_all(blocks.blocks)?;
NodeCommsResponse::HistoricalBlocks(blocks)
Expand Down Expand Up @@ -76,6 +77,7 @@ impl TryFrom<NodeCommsResponse> for ProtoNodeCommsResponse {
fn try_from(response: NodeCommsResponse) -> Result<Self, Self::Error> {
use NodeCommsResponse::{FetchMempoolTransactionsByExcessSigsResponse, HistoricalBlocks};
match response {
NodeCommsResponse::Block(block) => Ok(ProtoNodeCommsResponse::BlockResponse((*block).try_into()?)),
HistoricalBlocks(historical_blocks) => {
let historical_blocks = historical_blocks
.into_iter()
Expand Down Expand Up @@ -151,6 +153,30 @@ impl TryInto<Option<HistoricalBlock>> for proto::base_node::HistoricalBlockRespo
}
}

impl TryFrom<Option<Block>> for proto::base_node::BlockResponse {
type Error = String;

fn try_from(v: Option<Block>) -> Result<Self, Self::Error> {
Ok(Self {
block: v.map(TryInto::try_into).transpose()?,
})
}
}

impl TryInto<Option<Block>> for proto::base_node::BlockResponse {
type Error = String;

fn try_into(self) -> Result<Option<Block>, Self::Error> {
match self.block {
Some(block) => {
let block = block.try_into()?;
Ok(Some(block))
},
None => Ok(None),
}
}
}

//---------------------------------- Collection impls --------------------------------------------//

// The following allow `Iterator::collect` to collect into these repeated types
Expand Down