Skip to content

Commit

Permalink
refactor(node): remove expect calls in node.rs
Browse files Browse the repository at this point in the history
node.rs
 * add 4 variants to NodeError
 * return Result<_, NodeError> from add_block() and
   related Node methods.
 * replace some expect calls with NodeError::Inconsistency
 * return NodeError::BadMagic error from ::handle_message()
 * log error in Node::main if receive_message task fails
 * log error in Node::main if receive_request task fails
 * log error in Node::main if handle_mined_block task fails
 * log error and pause tasks in Node::main if try_get_time() fails
 * replace get_time() with try_get_time() where possible

persistence.rs
 * return Result from closure arg to BlockStorage::read_blocks()

util.rs
 * change EpochError visibility: pub(crate) --> pub
  • Loading branch information
dan-da committed Dec 20, 2022
1 parent b3d837c commit 725da5a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 43 deletions.
114 changes: 77 additions & 37 deletions kindelia_core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,18 @@ pub fn miner_loop(
pub enum NodeError {
#[error(transparent)]
BlockStorage(#[from] BlockStorageError),

#[error(transparent)]
BlockLookup(#[from] BlockLookupError),

#[error(transparent)]
Epoch(#[from] EpochError),

#[error("Inconsistent data error:\n msg: {msg}\n context: {context}")]
Inconsistency { msg: String, context: String },

#[error("Bad Magic! Block's magic value {magic} does not match network_id {network_id}")]
BadMagic { magic: u32, network_id: u32 },
}

/// Errors associated with Blocks
Expand Down Expand Up @@ -914,7 +926,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
// - In case of a reorg, rollback to the block before it
// - Run that block's code, updating the HVM state
// - Updates the longest chain saved on disk
pub fn add_block(&mut self, block: &HashedBlock) {
pub fn add_block(&mut self, block: &HashedBlock) -> Result<(), NodeError> {
// Adding a block might trigger the addition of other blocks
// that were waiting for it. Because of that, we loop here.

Expand All @@ -924,7 +936,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
while let Some(block) = must_include.pop() {
let btime = block.time;
// If block is too far into the future, ignore it
if btime >= get_time() + DELAY_TOLERANCE {
if btime >= try_get_time()? + DELAY_TOLERANCE {
emit_event!(
self.event_emitter,
NodeEventType::too_late(&block),
Expand Down Expand Up @@ -1114,8 +1126,12 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
// This will cause the block to be moved from self.pending to self.block
if let Some(wait_list) = self.wait_list.get(&bhash) {
for waiting_for_me in wait_list {
must_include
.push(self.pending.remove(waiting_for_me).expect("block"));
must_include.push(self.pending.remove(waiting_for_me).ok_or(
NodeError::Inconsistency {
msg: format!("block {} not found", waiting_for_me),
context: "removing block from pending list".to_string(),
},
)?);
}
self.wait_list.remove(&bhash);
}
Expand All @@ -1132,6 +1148,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
);
}
}
Ok(())
}

pub fn compute_block(
Expand Down Expand Up @@ -1191,20 +1208,22 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
return longest;
}

pub fn receive_message(&mut self) {
pub fn receive_message(&mut self) -> Result<(), NodeError> {
let mut count = 0;
for (addr, msg) in self.comm.proto_recv() {
//if count < HANDLE_MESSAGE_LIMIT { TODO: ???
self.handle_message(addr, &msg);
self.handle_message(addr, &msg)?;
count = count + 1;
//}
}
Ok(())
}

fn receive_request(&mut self) {
fn receive_request(&mut self) -> Result<(), NodeError> {
if let Ok(request) = self.query_recv.try_recv() {
self.handle_request(request);
self.handle_request(request)?;
}
Ok(())
}

pub fn get_block_hash_by_index(&self, index: u64) -> Option<U256> {
Expand Down Expand Up @@ -1273,7 +1292,10 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Some(RegInfo { ownr, stmt })
}

pub fn handle_request(&mut self, request: NodeRequest<C>) {
pub fn handle_request(
&mut self,
request: NodeRequest<C>,
) -> Result<(), NodeError> {
fn handle_ans_err<T>(req_txt: &str, res: Result<(), T>) {
if let Err(_) = res {
eprintln!("WARN: failed to send node request {} answer back", req_txt);
Expand Down Expand Up @@ -1326,19 +1348,16 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
let hashes = self.get_longest_chain(Some(num));
let infos = hashes
.iter()
.map(|h| {
self
.get_block_info(h)
.expect("should obtain block info result ok")
.expect("should find block")
})
.collect();
.map(|h| self.get_block_info(h))
.collect::<Result<Option<_>, _>>()?
.ok_or(NodeError::Inconsistency {
msg: format!("block not found in range [{}..{}]", start, end),
context: "processing GetBlocks request".to_string(),
})?;
handle_ans_err("GetBlocks", tx.send(infos));
}
NodeRequest::GetBlock { hash, tx } => {
let info = self
.get_block_info(&hash)
.expect("should obtain block info result ok");
let info = self.get_block_info(&hash)?;
handle_ans_err("GetBlock", tx.send(info));
}
NodeRequest::GetBlockHash { index, tx } => {
Expand Down Expand Up @@ -1423,6 +1442,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
handle_ans_err("Publish", tx.send(result));
}
}
Ok(())
}

// Sends a block to a target address; also share some random peers
Expand Down Expand Up @@ -1491,20 +1511,23 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
&mut self,
addr: C::Address,
msg: &Message<C::Address>,
) {
) -> Result<(), NodeError> {
if addr != self.addr {
match msg {
Message::GiveMeThatBlock { magic, .. }
| Message::NoticeTheseBlocks { magic, .. }
| Message::PleaseMineThisTransaction { magic, .. } => {
if magic != &self.network_id {
return;
return Err(NodeError::BadMagic {
magic: *magic,
network_id: self.network_id,
});
}
}
}

self.peers.see_peer(
Peer { address: addr, seen_at: get_time() },
Peer { address: addr, seen_at: try_get_time()? },
#[cfg(feature = "events")]
self.event_emitter.clone(),
);
Expand Down Expand Up @@ -1566,7 +1589,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {

// Adds the block to the database
for block in &blocks {
self.add_block(&block);
self.add_block(&block)?;
}

// Requests missing ancestors
Expand All @@ -1590,6 +1613,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
}
}
}
Ok(())
}

pub fn gossip(&mut self, peer_count: u128, message: &Message<C::Address>) {
Expand Down Expand Up @@ -1623,8 +1647,11 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
pub fn load_blocks(&mut self) -> Result<(), NodeError> {
self.storage.disable();
let storage = self.storage.clone();
storage
.read_blocks(|(block, _file_path)| self.add_block(&block.hashed()))?;
storage.read_blocks(|(block, _file_path)| {
self
.add_block(&block.hashed())
.map_err(|e| BlockStorageError::Read { source: Box::new(e) })
})?;
self.storage.enable();
Ok(())
}
Expand All @@ -1646,13 +1673,14 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
self.send_to_miner(MinerMessage::Request { prev: self.tip, body, targ });
}

fn do_handle_mined_block(&mut self) {
fn do_handle_mined_block(&mut self) -> Result<(), NodeError> {
if let Some(miner_comm) = &mut self.miner_comm {
if let MinerMessage::Answer { block } = miner_comm.read() {
self.add_block(&block);
self.add_block(&block)?;
self.broadcast_tip_block();
}
}
Ok(())
}

/// Builds the body to be mined.
Expand Down Expand Up @@ -1771,14 +1799,18 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Task {
delay: HANDLE_MESSAGE_DELAY,
action: |node| {
node.receive_message();
if let Err(e) = node.receive_message() {
eprintln!("Error in task receive_message.\n{}", e);
}
},
},
// Receives and handles incoming API requests
Task {
delay: HANDLE_REQUEST_DELAY,
action: |node| {
node.receive_request();
if let Err(e) = node.receive_request() {
eprintln!("Error in task receive_request.\n{}", e);
}
},
},
// Forgets inactive peers
Expand All @@ -1797,7 +1829,7 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
delay: 5_000,
action: |node| {
if let Err(e) = node.log_heartbeat() {
eprintln!("Error logging heartbeat.\n{}", e);
eprintln!("Error in task log_heartbeat.\n{}", e);
}
},
},
Expand All @@ -1820,7 +1852,9 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
Task {
delay: 5,
action: |node| {
node.do_handle_mined_block();
if let Err(e) = node.do_handle_mined_block() {
eprintln!("Error in task handle_mined_block.\n{}", e);
}
},
},
];
Expand All @@ -1832,15 +1866,21 @@ impl<C: ProtoComm, S: BlockStorage> Node<C, S> {

loop {
let now = std::time::Instant::now();
let system_time = get_time(); // Measured in milliseconds
for (i, task) in tasks.iter().enumerate() {
if last_tick_time[i] + task.delay <= system_time {
(task.action)(&mut self);
last_tick_time[i] = system_time;
}
match try_get_time() { // Measured in milliseconds
Ok(system_time) => {
for (i, task) in tasks.iter().enumerate() {
if last_tick_time[i] + task.delay <= system_time {
(task.action)(&mut self);
last_tick_time[i] = system_time;
}
}
},
Err(e) => eprintln!("{}\ncontext: main task loop. (tasks paused until system time is fixed)", e),
}

let elapsed = now.elapsed();
let extra = std::time::Duration::from_millis(1).checked_sub(elapsed);

// If the elapsed time is less than 1ms, sleep for the remaining time
if let Some(extra) = extra {
std::thread::sleep(extra);
Expand Down
18 changes: 13 additions & 5 deletions kindelia_core/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,9 @@ where
height: u128,
block: HashedBlock,
) -> Result<(), BlockStorageError>;
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
then: F,
) -> Result<(), BlockStorageError>;
Expand Down Expand Up @@ -423,9 +425,11 @@ impl BlockStorage for SimpleFileStorage {
}
Ok(())
}
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
then: F,
mut then: F,
) -> Result<(), BlockStorageError> {
let file_paths = get_ordered_blocks_path(&self.path)
.map_err(|e| BlockStorageError::Read { source: Box::new(e) })?;
Expand All @@ -445,7 +449,9 @@ impl BlockStorage for SimpleFileStorage {
.ok_or(BlockStorageError::Serialization { source: None })?;
items.push((block, file_path));
}
items.into_iter().for_each(then);
for item in items.into_iter() {
then(item)?
}
Ok(())
}
fn disable(&mut self) {
Expand Down Expand Up @@ -525,7 +531,9 @@ pub struct EmptyStorage;
impl BlockStorage for EmptyStorage {
fn enable(&mut self) {}
fn disable(&mut self) {}
fn read_blocks<F: FnMut((node::Block, PathBuf))>(
fn read_blocks<
F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>,
>(
&self,
_: F,
) -> Result<(), BlockStorageError> {
Expand Down
2 changes: 1 addition & 1 deletion kindelia_core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub(crate) fn get_time_micro() -> u128 {
/// since epoch.
#[derive(Error, Debug)]
#[error("SystemTime precedes the unix epoch. {now:?} < {epoch:?}")]
pub(crate) struct EpochError {
pub struct EpochError {
pub now: SystemTime,
pub epoch: SystemTime,
pub source: SystemTimeError,
Expand Down

0 comments on commit 725da5a

Please sign in to comment.