Skip to content

Commit

Permalink
Merge pull request #836 from SaitoTech/feature/828/peer_file
Browse files Browse the repository at this point in the history
Feature/828/peer file
  • Loading branch information
SankaD authored Oct 8, 2024
2 parents 0ac586a + e4586e4 commit 7c28788
Show file tree
Hide file tree
Showing 32 changed files with 206 additions and 68 deletions.
4 changes: 2 additions & 2 deletions saito-core/src/core/consensus/blockchain_sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::VecDeque;
use std::sync::Arc;

use crate::core::consensus::blockchain::Blockchain;
use crate::core::consensus::peer_collection::PeerCollection;
use crate::core::consensus::peers::peer_collection::PeerCollection;
use ahash::HashMap;
use log::{debug, error, info, trace, warn};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -173,7 +173,7 @@ impl BlockchainSyncState {
let mut allowed_quota = self.batch_size - fetching_count;

for block_data in deq.iter_mut() {
// we limit concurrent fetches to this amount
// we peers concurrent fetches to this amount
if allowed_quota == 0 {
// we have reached allowed concurrent fetches quota.
break;
Expand Down
2 changes: 0 additions & 2 deletions saito-core/src/core/consensus/limit/mod.rs

This file was deleted.

5 changes: 1 addition & 4 deletions saito-core/src/core/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ pub mod burnfee;
pub mod context;
pub mod golden_ticket;
pub mod hop;
pub mod limit;
pub mod mempool;
pub mod merkle;
pub mod peer;
pub mod peer_collection;
pub mod peer_service;
pub mod peers;
pub mod ringitem;
pub mod slip;
pub mod transaction;
Expand Down
6 changes: 6 additions & 0 deletions saito-core/src/core/consensus/peers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod block_depth_limit_checker;
pub mod peer;
pub mod peer_collection;
pub mod peer_service;
pub mod peer_state_writer;
pub mod rate_limiter;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::core::consensus::limit::block_depth_limit_checker::BlockDepthLimitChecker;
use crate::core::consensus::limit::rate_limiter::RateLimiter;
use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::block_depth_limit_checker::BlockDepthLimitChecker;
use crate::core::consensus::peers::peer_service::PeerService;
use crate::core::consensus::peers::rate_limiter::RateLimiter;
use crate::core::consensus::wallet::Wallet;
use crate::core::defs::{
PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp, WS_KEEP_ALIVE_PERIOD,
Expand Down Expand Up @@ -93,6 +93,15 @@ impl Peer {
pub fn has_invalid_block_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
self.invalid_block_limiter.has_limit_exceeded(current_time)
}
pub fn get_limited_till(&mut self, current_time: Timestamp) -> Option<Timestamp> {
let mut result = None;

if self.has_key_list_limit_exceeded(current_time) {
if self.key_list_limiter.has_limit_exceeded(current_time) {}
}

result
}

pub fn get_url(&self) -> String {
if let Some(config) = self.static_peer_config.as_ref() {
Expand Down Expand Up @@ -426,7 +435,7 @@ impl Peer {

#[cfg(test)]
mod tests {
use crate::core::consensus::peer::{Peer, PeerStatus};
use crate::core::consensus::peers::peer::{Peer, PeerStatus};
use crate::core::process::version::Version;
use std::cmp::Ordering;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::core::consensus::peer::{Peer, PeerStatus};
use crate::core::consensus::peers::peer::{Peer, PeerStatus};
use crate::core::consensus::peers::peer_state_writer::PeerStateWriter;
use crate::core::defs::{PeerIndex, SaitoPublicKey, Timestamp};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -21,6 +22,7 @@ pub struct PeerCollection {
pub index_to_peers: HashMap<PeerIndex, Peer>,
pub address_to_peers: HashMap<SaitoPublicKey, PeerIndex>,
pub peer_counter: PeerCounter,
pub peer_state_writer: PeerStateWriter,
}

impl PeerCollection {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl PeerService {

#[cfg(test)]
mod tests {
use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::peer_service::PeerService;

#[test]
fn test_serialize() {
Expand Down
95 changes: 95 additions & 0 deletions saito-core/src/core/consensus/peers/peer_state_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::core::defs::{PeerIndex, PrintForLog, SaitoPublicKey, Timestamp};
use crate::core::io::interface_io::InterfaceIO;
use crate::core::routing_thread::PeerState;
use ahash::HashMap;
use std::io::Error;
use std::time::Duration;

const PEER_STATE_FILENAME: &str = "./data/peer_state.txt";

pub(crate) const PEER_STATE_WRITE_PERIOD: Timestamp =
Duration::from_secs(1).as_millis() as Timestamp;

#[derive(Debug, Clone)]
pub(crate) struct PeerStateEntry {
pub peer_index: PeerIndex,
pub public_key: SaitoPublicKey,
pub msg_limit_exceeded: bool,
pub invalid_blocks_received: bool,
pub same_depth_blocks_received: bool,
pub too_far_blocks_received: bool,
pub handshake_limit_exceeded: bool,
pub keylist_limit_exceeded: bool,
pub limited_till: Option<Timestamp>,
}

impl Default for PeerStateEntry {
fn default() -> Self {
Self {
peer_index: 0,
public_key: [0; 33],
msg_limit_exceeded: false,
invalid_blocks_received: false,
same_depth_blocks_received: false,
too_far_blocks_received: false,
handshake_limit_exceeded: false,
keylist_limit_exceeded: false,
limited_till: None,
}
}
}

#[derive(Clone, Debug, Default)]
pub(crate) struct PeerStateWriter {}

impl PeerStateWriter {
/// Writes peer state data to the file and clears collected state
///
/// # Arguments
///
/// * `io_handler`:
///
/// returns: Result<(), Error>
///
/// # Examples
///
/// ```
///
/// ```
pub(crate) async fn write_state(
&self,
data: Vec<PeerStateEntry>,
io_handler: &mut Box<dyn InterfaceIO + Send + Sync>,
) -> Result<(), Error> {
let line =
"peer_index,public_key,limited_till,msg_limit,invalid_blocks_limit,same_depth_limit,too_far_block_limit,handshake_limit,keylist_limit\r\n"
.to_string();
io_handler
.write_value(PEER_STATE_FILENAME, line.as_bytes())
.await?;

for data in data.iter() {
let line = format!(
"{:?},{:?},{:?},{:?},{:?},{:?},{:?},{:?},{:?}\r\n",
data.peer_index,
data.public_key.to_base58(),
data.limited_till.unwrap_or(0),
data.msg_limit_exceeded,
data.invalid_blocks_received,
data.same_depth_blocks_received,
data.too_far_blocks_received,
data.handshake_limit_exceeded,
data.keylist_limit_exceeded,
);
io_handler
.append_value(PEER_STATE_FILENAME, line.as_bytes())
.await?
}

if !data.is_empty() {
io_handler.flush_data(PEER_STATE_FILENAME).await?;
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl RateLimiter {
}

pub fn has_limit_exceeded(&mut self, current_time: Timestamp) -> bool {
// TODO : current implementation allows twice the limit from spikes. a sliding window implementation would be better I think.
// TODO : current implementation allows twice the peers from spikes. a sliding window implementation would be better I think.
if current_time.saturating_sub(self.last_request_time) > self.window {
self.request_count = 0;
self.last_request_time = current_time;
Expand Down
2 changes: 1 addition & 1 deletion saito-core/src/core/io/interface_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io::Error;

use async_trait::async_trait;

use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::peer_service::PeerService;
use crate::core::consensus::wallet::Wallet;
use crate::core::defs::{BlockId, PeerIndex, SaitoHash, SaitoPublicKey};
use crate::core::process::version::Version;
Expand Down
12 changes: 6 additions & 6 deletions saito-core/src/core/io/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tokio::sync::RwLock;
use crate::core::consensus::block::Block;
use crate::core::consensus::blockchain::Blockchain;
use crate::core::consensus::mempool::Mempool;
use crate::core::consensus::peer::{Peer, PeerStatus};
use crate::core::consensus::peer_collection::PeerCollection;
use crate::core::consensus::peers::peer::{Peer, PeerStatus};
use crate::core::consensus::peers::peer_collection::PeerCollection;
use crate::core::consensus::transaction::{Transaction, TransactionType};
use crate::core::consensus::wallet::Wallet;
use crate::core::defs::{BlockId, PeerIndex, PrintForLog, SaitoHash, SaitoPublicKey, Timestamp};
Expand Down Expand Up @@ -198,7 +198,7 @@ impl Network {
peer.handshake_limiter.increase();
if peer.has_handshake_limit_exceeded(current_time) {
warn!(
"peer {:?} exceeded rate limit for handshake challenge",
"peer {:?} exceeded rate peers for handshake challenge",
peer_index
);
return;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Network {
peer.handshake_limiter.increase();
if peer.has_handshake_limit_exceeded(current_time) {
warn!(
"peer {:?} exceeded rate limit for handshake challenge",
"peer {:?} exceeded rate peers for handshake challenge",
peer_index
);
return;
Expand Down Expand Up @@ -304,10 +304,10 @@ impl Network {
let peer = peers.index_to_peers.get_mut(&peer_index);

if let Some(peer) = peer {
// Check rate limit
// Check rate peers
peer.key_list_limiter.increase();
if peer.has_key_list_limit_exceeded(current_time) {
debug!("peer {:?} exceeded rate limit for key list", peer_index);
debug!("peer {:?} exceeded rate peers for key list", peer_index);
return Err(Error::from(ErrorKind::Other));
} else {
debug!("can make request")
Expand Down
2 changes: 1 addition & 1 deletion saito-core/src/core/msg/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{Error, ErrorKind};

use log::{trace, warn};

use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::peer_service::PeerService;
use crate::core::defs::{SaitoHash, SaitoPublicKey, SaitoSignature};
use crate::core::process::version::Version;
use crate::core::util::serialize::Serialize;
Expand Down
2 changes: 1 addition & 1 deletion saito-core/src/core/msg/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{Error, ErrorKind};
use log::{error, warn};

use crate::core::consensus::block::{Block, BlockType};
use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::peer_service::PeerService;
use crate::core::consensus::transaction::Transaction;
use crate::core::defs::{BlockHash, BlockId, ForkId, SaitoPublicKey};
use crate::core::msg::api_message::ApiMessage;
Expand Down
54 changes: 47 additions & 7 deletions saito-core/src/core/routing_thread.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::sync::Arc;
use std::time::Duration;

use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, info, trace, warn};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::{RwLock, RwLockReadGuard};

use crate::core::consensus::blockchain::Blockchain;
use crate::core::consensus::blockchain_sync_state::BlockchainSyncState;
use crate::core::consensus::mempool::Mempool;
use crate::core::consensus::peer_service::PeerService;
use crate::core::consensus::peers::peer_service::PeerService;
use crate::core::consensus::peers::peer_state_writer::{PeerStateEntry, PEER_STATE_WRITE_PERIOD};
use crate::core::consensus::wallet::Wallet;
use crate::core::consensus_thread::ConsensusEvent;
use crate::core::defs::{
Expand Down Expand Up @@ -89,6 +90,7 @@ pub struct RoutingThread {
pub network: Network,
pub reconnection_timer: Timestamp,
pub peer_removal_timer: Timestamp,
pub peer_file_write_timer: Timestamp,
pub stats: RoutingStats,
pub senders_to_verification: Vec<Sender<VerifyRequest>>,
pub last_verification_thread_index: usize,
Expand Down Expand Up @@ -525,6 +527,41 @@ impl RoutingThread {
warn!("peer {:?} not found to update services", peer_index);
}
}

async fn write_peer_state_data(&mut self, duration_value: Timestamp, work_done: &mut bool) {
self.peer_file_write_timer += duration_value;
if self.peer_file_write_timer >= PEER_STATE_WRITE_PERIOD {
let mut peers = self.network.peer_lock.write().await;
let mut data: Vec<PeerStateEntry> = Default::default();

let current_time = self.timer.get_timestamp_in_ms();

for (peer_index, peer) in peers.index_to_peers.iter_mut() {
if peer.public_key.is_none() {
info!("public key not set yet for peer : {:?}", peer_index);
continue;
}
data.push(PeerStateEntry {
peer_index: peer.index,
public_key: peer.public_key.unwrap_or([0; 33]),
msg_limit_exceeded: peer.has_message_limit_exceeded(current_time),
invalid_blocks_received: peer.has_invalid_block_limit_exceeded(current_time),
same_depth_blocks_received: false,
too_far_blocks_received: false,
handshake_limit_exceeded: peer.has_handshake_limit_exceeded(current_time),
keylist_limit_exceeded: peer.has_key_list_limit_exceeded(current_time),
limited_till: None,
});
}
peers
.peer_state_writer
.write_state(data, &mut self.network.io_interface)
.await
.unwrap();
self.peer_file_write_timer = 0;
*work_done = true;
}
}
}

#[async_trait]
Expand All @@ -538,7 +575,7 @@ impl ProcessEvent<RoutingEvent> for RoutingThread {
let time = self.timer.get_timestamp_in_ms();
peer.message_limiter.increase();
if peer.has_message_limit_exceeded(time) {
info!("limit exceeded for messages from peer : {:?}", peer_index);
info!("peers exceeded for messages from peer : {:?}", peer_index);
return None;
}
}
Expand Down Expand Up @@ -587,10 +624,9 @@ impl ProcessEvent<RoutingEvent> for RoutingThread {
let mut peers = self.network.peer_lock.write().await;
let peer = peers.find_peer_by_index_mut(peer_index)?;
let time = self.timer.get_timestamp_in_ms();
peer.invalid_block_limiter.increase();
if peer.has_invalid_block_limit_exceeded(time) {
info!(
"limit exceeded for invalid blocks from peer : {:?}. disconnecting peer...",
"peers exceeded for invalid blocks from peer : {:?}. disconnecting peer...",
peer_index
);
self.network
Expand Down Expand Up @@ -652,8 +688,12 @@ impl ProcessEvent<RoutingEvent> for RoutingThread {
let mut peers = self.network.peer_lock.write().await;
peers.remove_disconnected_peers(current_time);
self.peer_removal_timer = 0;
work_done = true;
}

self.write_peer_state_data(duration_value, &mut work_done)
.await;

if work_done {
return Some(());
}
Expand Down
2 changes: 1 addition & 1 deletion saito-core/src/core/util/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rand::{thread_rng, Rng};
use secp256k1::ecdsa;
pub use secp256k1::{Message, PublicKey, SecretKey, SECP256K1};

type Aes128Cbc = Cbc<Aes128, Pkcs7>;
// type Aes128Cbc = Cbc<Aes128, Pkcs7>;

pub const PARALLEL_HASH_BYTE_THRESHOLD: usize = 128_000;

Expand Down
Loading

0 comments on commit 7c28788

Please sign in to comment.