diff --git a/kindelia_core/src/node.rs b/kindelia_core/src/node.rs index 0d9631f..5f82be5 100644 --- a/kindelia_core/src/node.rs +++ b/kindelia_core/src/node.rs @@ -800,6 +800,18 @@ pub fn miner_loop( pub enum NodeError { #[error(transparent)] BlockStorage(#[from] BlockStorageError), + + #[error(transparent)] + BlockLookup(#[from] BlockLookupError), + + #[error(transparent)] + Epoch(#[from] EpochError), + + #[error("Inconsistent data error:\n msg: {msg}\n context: {context}")] + Inconsistency { msg: String, context: String }, + + #[error("Bad Magic! Block's magic value {magic} does not match network_id {network_id}")] + BadMagic { magic: u32, network_id: u32 }, } /// Errors associated with Blocks @@ -914,7 +926,7 @@ impl Node { // - In case of a reorg, rollback to the block before it // - Run that block's code, updating the HVM state // - Updates the longest chain saved on disk - pub fn add_block(&mut self, block: &HashedBlock) { + pub fn add_block(&mut self, block: &HashedBlock) -> Result<(), NodeError> { // Adding a block might trigger the addition of other blocks // that were waiting for it. Because of that, we loop here. @@ -924,7 +936,7 @@ impl Node { while let Some(block) = must_include.pop() { let btime = block.time; // If block is too far into the future, ignore it - if btime >= get_time() + DELAY_TOLERANCE { + if btime >= try_get_time()? + DELAY_TOLERANCE { emit_event!( self.event_emitter, NodeEventType::too_late(&block), @@ -1114,8 +1126,12 @@ impl Node { // This will cause the block to be moved from self.pending to self.block if let Some(wait_list) = self.wait_list.get(&bhash) { for waiting_for_me in wait_list { - must_include - .push(self.pending.remove(waiting_for_me).expect("block")); + must_include.push(self.pending.remove(waiting_for_me).ok_or( + NodeError::Inconsistency { + msg: format!("block {} not found", waiting_for_me), + context: "removing block from pending list".to_string(), + }, + )?); } self.wait_list.remove(&bhash); } @@ -1132,6 +1148,7 @@ impl Node { ); } } + Ok(()) } pub fn compute_block( @@ -1191,20 +1208,22 @@ impl Node { return longest; } - pub fn receive_message(&mut self) { + pub fn receive_message(&mut self) -> Result<(), NodeError> { let mut count = 0; for (addr, msg) in self.comm.proto_recv() { //if count < HANDLE_MESSAGE_LIMIT { TODO: ??? - self.handle_message(addr, &msg); + self.handle_message(addr, &msg)?; count = count + 1; //} } + Ok(()) } - fn receive_request(&mut self) { + fn receive_request(&mut self) -> Result<(), NodeError> { if let Ok(request) = self.query_recv.try_recv() { - self.handle_request(request); + self.handle_request(request)?; } + Ok(()) } pub fn get_block_hash_by_index(&self, index: u64) -> Option { @@ -1273,7 +1292,10 @@ impl Node { Some(RegInfo { ownr, stmt }) } - pub fn handle_request(&mut self, request: NodeRequest) { + pub fn handle_request( + &mut self, + request: NodeRequest, + ) -> Result<(), NodeError> { fn handle_ans_err(req_txt: &str, res: Result<(), T>) { if let Err(_) = res { eprintln!("WARN: failed to send node request {} answer back", req_txt); @@ -1326,19 +1348,16 @@ impl Node { let hashes = self.get_longest_chain(Some(num)); let infos = hashes .iter() - .map(|h| { - self - .get_block_info(h) - .expect("should obtain block info result ok") - .expect("should find block") - }) - .collect(); + .map(|h| self.get_block_info(h)) + .collect::, _>>()? + .ok_or(NodeError::Inconsistency { + msg: format!("block not found in range [{}..{}]", start, end), + context: "processing GetBlocks request".to_string(), + })?; handle_ans_err("GetBlocks", tx.send(infos)); } NodeRequest::GetBlock { hash, tx } => { - let info = self - .get_block_info(&hash) - .expect("should obtain block info result ok"); + let info = self.get_block_info(&hash)?; handle_ans_err("GetBlock", tx.send(info)); } NodeRequest::GetBlockHash { index, tx } => { @@ -1423,6 +1442,7 @@ impl Node { handle_ans_err("Publish", tx.send(result)); } } + Ok(()) } // Sends a block to a target address; also share some random peers @@ -1491,20 +1511,23 @@ impl Node { &mut self, addr: C::Address, msg: &Message, - ) { + ) -> Result<(), NodeError> { if addr != self.addr { match msg { Message::GiveMeThatBlock { magic, .. } | Message::NoticeTheseBlocks { magic, .. } | Message::PleaseMineThisTransaction { magic, .. } => { if magic != &self.network_id { - return; + return Err(NodeError::BadMagic { + magic: *magic, + network_id: self.network_id, + }); } } } self.peers.see_peer( - Peer { address: addr, seen_at: get_time() }, + Peer { address: addr, seen_at: try_get_time()? }, #[cfg(feature = "events")] self.event_emitter.clone(), ); @@ -1566,7 +1589,7 @@ impl Node { // Adds the block to the database for block in &blocks { - self.add_block(&block); + self.add_block(&block)?; } // Requests missing ancestors @@ -1590,6 +1613,7 @@ impl Node { } } } + Ok(()) } pub fn gossip(&mut self, peer_count: u128, message: &Message) { @@ -1623,8 +1647,11 @@ impl Node { pub fn load_blocks(&mut self) -> Result<(), NodeError> { self.storage.disable(); let storage = self.storage.clone(); - storage - .read_blocks(|(block, _file_path)| self.add_block(&block.hashed()))?; + storage.read_blocks(|(block, _file_path)| { + self + .add_block(&block.hashed()) + .map_err(|e| BlockStorageError::Read { source: Box::new(e) }) + })?; self.storage.enable(); Ok(()) } @@ -1646,13 +1673,14 @@ impl Node { self.send_to_miner(MinerMessage::Request { prev: self.tip, body, targ }); } - fn do_handle_mined_block(&mut self) { + fn do_handle_mined_block(&mut self) -> Result<(), NodeError> { if let Some(miner_comm) = &mut self.miner_comm { if let MinerMessage::Answer { block } = miner_comm.read() { - self.add_block(&block); + self.add_block(&block)?; self.broadcast_tip_block(); } } + Ok(()) } /// Builds the body to be mined. @@ -1771,14 +1799,18 @@ impl Node { Task { delay: HANDLE_MESSAGE_DELAY, action: |node| { - node.receive_message(); + if let Err(e) = node.receive_message() { + eprintln!("Error in task receive_message.\n{}", e); + } }, }, // Receives and handles incoming API requests Task { delay: HANDLE_REQUEST_DELAY, action: |node| { - node.receive_request(); + if let Err(e) = node.receive_request() { + eprintln!("Error in task receive_request.\n{}", e); + } }, }, // Forgets inactive peers @@ -1797,7 +1829,7 @@ impl Node { delay: 5_000, action: |node| { if let Err(e) = node.log_heartbeat() { - eprintln!("Error logging heartbeat.\n{}", e); + eprintln!("Error in task log_heartbeat.\n{}", e); } }, }, @@ -1820,7 +1852,9 @@ impl Node { Task { delay: 5, action: |node| { - node.do_handle_mined_block(); + if let Err(e) = node.do_handle_mined_block() { + eprintln!("Error in task handle_mined_block.\n{}", e); + } }, }, ]; @@ -1832,15 +1866,21 @@ impl Node { loop { let now = std::time::Instant::now(); - let system_time = get_time(); // Measured in milliseconds - for (i, task) in tasks.iter().enumerate() { - if last_tick_time[i] + task.delay <= system_time { - (task.action)(&mut self); - last_tick_time[i] = system_time; - } + match try_get_time() { // Measured in milliseconds + Ok(system_time) => { + for (i, task) in tasks.iter().enumerate() { + if last_tick_time[i] + task.delay <= system_time { + (task.action)(&mut self); + last_tick_time[i] = system_time; + } + } + }, + Err(e) => eprintln!("{}\ncontext: main task loop. (tasks paused until system time is fixed)", e), } + let elapsed = now.elapsed(); let extra = std::time::Duration::from_millis(1).checked_sub(elapsed); + // If the elapsed time is less than 1ms, sleep for the remaining time if let Some(extra) = extra { std::thread::sleep(extra); diff --git a/kindelia_core/src/persistence.rs b/kindelia_core/src/persistence.rs index f1d1907..13f97bf 100644 --- a/kindelia_core/src/persistence.rs +++ b/kindelia_core/src/persistence.rs @@ -335,7 +335,9 @@ where height: u128, block: HashedBlock, ) -> Result<(), BlockStorageError>; - fn read_blocks( + fn read_blocks< + F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>, + >( &self, then: F, ) -> Result<(), BlockStorageError>; @@ -423,9 +425,11 @@ impl BlockStorage for SimpleFileStorage { } Ok(()) } - fn read_blocks( + fn read_blocks< + F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>, + >( &self, - then: F, + mut then: F, ) -> Result<(), BlockStorageError> { let file_paths = get_ordered_blocks_path(&self.path) .map_err(|e| BlockStorageError::Read { source: Box::new(e) })?; @@ -445,7 +449,9 @@ impl BlockStorage for SimpleFileStorage { .ok_or(BlockStorageError::Serialization { source: None })?; items.push((block, file_path)); } - items.into_iter().for_each(then); + for item in items.into_iter() { + then(item)? + } Ok(()) } fn disable(&mut self) { @@ -525,7 +531,9 @@ pub struct EmptyStorage; impl BlockStorage for EmptyStorage { fn enable(&mut self) {} fn disable(&mut self) {} - fn read_blocks( + fn read_blocks< + F: FnMut((node::Block, PathBuf)) -> Result<(), BlockStorageError>, + >( &self, _: F, ) -> Result<(), BlockStorageError> { diff --git a/kindelia_core/src/util.rs b/kindelia_core/src/util.rs index 7239371..2b4040d 100644 --- a/kindelia_core/src/util.rs +++ b/kindelia_core/src/util.rs @@ -191,7 +191,7 @@ pub(crate) fn get_time_micro() -> u128 { /// since epoch. #[derive(Error, Debug)] #[error("SystemTime precedes the unix epoch. {now:?} < {epoch:?}")] -pub(crate) struct EpochError { +pub struct EpochError { pub now: SystemTime, pub epoch: SystemTime, pub source: SystemTimeError,