Skip to content

Commit

Permalink
Merge pull request #2066 from subspace/backport-rate-limiter
Browse files Browse the repository at this point in the history
Backport and tweak RateLimiter to gemini-3f
  • Loading branch information
nazar-pc authored Oct 6, 2023
2 parents ce5a62b + 38880eb commit 5e3f8ef
Show file tree
Hide file tree
Showing 18 changed files with 679 additions and 392 deletions.
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const WORKER_CHANNEL_CAPACITY: usize = 100;
/// this number defines an interval in pieces after which cache is updated
const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(3).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");

#[derive(Debug, Clone)]
struct DiskPieceCacheState {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Size of the cache of archived segments for the purposes of faster sector expiration checks.
const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed");
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(3).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");

/// Errors that happen during plotting
#[derive(Debug, Error)]
Expand Down
116 changes: 116 additions & 0 deletions crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use clap::Parser;
use futures::channel::oneshot;
use futures::future::pending;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use libp2p::identity::Keypair;
use libp2p::kad::Mode;
use libp2p::multiaddr::Protocol;
Expand All @@ -11,6 +13,7 @@ use std::time::{Duration, Instant};
use subspace_core_primitives::PieceIndex;
use subspace_networking::utils::piece_provider::{NoPieceValidator, PieceProvider, RetryPolicy};
use subspace_networking::{Config, Node, PeerInfoProvider, PieceByIndexRequestHandler};
use tokio::sync::Semaphore;
use tracing::{error, info, warn, Level};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
Expand All @@ -28,6 +31,12 @@ struct Args {
/// production use.
#[arg(long, required = true)]
protocol_version: String,
/// Defines max established outgoing connections limit for the peer.
#[arg(long, default_value_t = 100)]
out_peers: u32,
/// Defines max pending outgoing connections limit for the peer.
#[arg(long, default_value_t = 100)]
pending_out_peers: u32,
#[clap(subcommand)]
command: Command,
}
Expand All @@ -40,6 +49,16 @@ enum Command {
#[arg(long, default_value_t = 0)]
start_with: usize,
},
Parallel {
#[arg(long, default_value_t = 100)]
max_pieces: usize,
#[arg(long, default_value_t = 0)]
start_with: usize,
#[arg(long, default_value_t = 0)]
retries: u16,
#[arg(long, default_value_t = 1)]
parallelism_level: u16,
},
}

#[tokio::main]
Expand All @@ -54,6 +73,8 @@ async fn main() {
args.bootstrap_nodes,
args.protocol_version,
args.enable_private_ips,
args.pending_out_peers,
args.out_peers,
)
.await;

Expand All @@ -64,6 +85,14 @@ async fn main() {
} => {
simple_benchmark(node, max_pieces, start_with).await;
}
Command::Parallel {
max_pieces,
start_with,
retries,
parallelism_level,
} => {
parallel_benchmark(node, max_pieces, start_with, retries, parallelism_level).await;
}
}

info!("Exiting..");
Expand Down Expand Up @@ -144,10 +173,95 @@ async fn simple_benchmark(node: Node, max_pieces: usize, start_with: usize) {
stats.display();
}

async fn parallel_benchmark(
node: Node,
max_pieces: usize,
start_with: usize,
retries: u16,
parallelism_level: u16,
) {
let start = Instant::now();
let mut stats = PieceRequestStats::default();
if max_pieces == 0 {
error!("Incorrect max_pieces variable set:{max_pieces}");
return;
}

let semaphore = &Semaphore::new(parallelism_level.into());

let piece_provider = &PieceProvider::<NoPieceValidator>::new(node, None);
let mut total_duration = Duration::default();
let mut pure_total_duration = Duration::default();
let mut pending_pieces = (start_with..(start_with + max_pieces))
.map(|i| {
let piece_index = PieceIndex::from(i as u64);
async move {
let start = Instant::now();

let permit = semaphore
.acquire()
.await
.expect("Semaphore cannot be closed.");
let semaphore_acquired = Instant::now();
let maybe_piece = piece_provider
.get_piece(piece_index, RetryPolicy::Limited(retries))
.await;

let end = Instant::now();
let pure_duration = end.duration_since(semaphore_acquired);
let full_duration = end.duration_since(start);

drop(permit);

(piece_index, maybe_piece, pure_duration, full_duration)
}
})
.collect::<FuturesUnordered<_>>();

while let Some((piece_index, maybe_piece, pure_duration, full_duration)) =
pending_pieces.next().await
{
total_duration += full_duration;
pure_total_duration += pure_duration;
match maybe_piece {
Ok(Some(_)) => {
info!(%piece_index, ?pure_duration, ?full_duration, "Piece found.");
stats.add_found();
}
Ok(None) => {
warn!(%piece_index, ?pure_duration, ?full_duration, "Piece not found.");
stats.add_not_found();
}
Err(error) => {
error!(%piece_index, ?pure_duration, ?full_duration, ?error, "Piece request failed.");
stats.add_error();
}
}
}

let average_duration = total_duration / max_pieces as u32;
let average_pure_duration = pure_total_duration / max_pieces as u32;
info!(
"Total time for {max_pieces} pieces: {:?}",
Instant::now().duration_since(start)
);
info!(
"Average time for {max_pieces} pieces: {:?}",
average_duration
);
info!(
"Average (no wait) time for {max_pieces} pieces: {:?}",
average_pure_duration
);
stats.display();
}

pub async fn configure_dsn(
bootstrap_addresses: Vec<Multiaddr>,
protocol_prefix: String,
enable_private_ips: bool,
pending_out_peers: u32,
out_peers: u32,
) -> Node {
let keypair = Keypair::generate_ed25519();

Expand All @@ -160,6 +274,8 @@ pub async fn configure_dsn(
request_response_protocols: vec![PieceByIndexRequestHandler::create(|_, _| async { None })],
bootstrap_addresses,
enable_autonat: false,
max_pending_outgoing_connections: pending_out_peers,
max_established_outgoing_connections: out_peers,
..default_config
};
let (node, mut node_runner_1) = subspace_networking::construct(config).unwrap();
Expand Down
39 changes: 7 additions & 32 deletions crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::protocols::peer_info::PeerInfoProvider;
use crate::protocols::request_response::request_response_factory::RequestHandler;
use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::utils::{strip_peer_id, ResizableSemaphore};
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::strip_peer_id;
use crate::{PeerInfo, PeerInfoConfig};
use backoff::{ExponentialBackoff, SystemClock};
use futures::channel::mpsc;
Expand Down Expand Up @@ -80,29 +81,6 @@ const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: Option<u32> = Some(3);
// use-case for gossipsub protocol.
const ENABLE_GOSSIP_PROTOCOL: bool = false;

/// Base limit for number of concurrent tasks initiated towards Kademlia.
///
/// We restrict this so we can manage outgoing requests a bit better by cancelling low-priority
/// requests, but this value will be boosted depending on number of connected peers.
const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = NonZeroUsize::new(15).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 15;
/// Base limit for number of any concurrent tasks except Kademlia.
///
/// We configure total number of streams per connection to 256. Here we assume half of them might be
/// incoming and half outgoing, we also leave a small buffer of streams just in case.
///
/// We restrict this so we don't exceed number of streams for single peer, but this value will be
/// boosted depending on number of connected peers.
const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize =
NonZeroUsize::new(50 - KADEMLIA_BASE_CONCURRENT_TASKS.get()).expect("Not zero; qed");
/// Above base limit will be boosted by specified number for every peer connected starting with
/// second peer, such that it scaled with network connectivity, but the exact coefficient might need
/// to be tweaked in the future.
pub(crate) const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 25;

const TEMPORARY_BANS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10_000).expect("Not zero; qed");
const TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
const TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR: f64 = 0.1;
Expand Down Expand Up @@ -524,15 +502,12 @@ where
// Create final structs
let (command_sender, command_receiver) = mpsc::channel(1);

let kademlia_tasks_semaphore = ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS);
let regular_tasks_semaphore = ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS);
let rate_limiter = RateLimiter::new(
max_established_outgoing_connections,
max_pending_outgoing_connections,
);

let shared = Arc::new(Shared::new(
local_peer_id,
command_sender,
kademlia_tasks_semaphore,
regular_tasks_semaphore,
));
let shared = Arc::new(Shared::new(local_peer_id, command_sender, rate_limiter));
let shared_weak = Arc::downgrade(&shared);

let node = Node::new(shared);
Expand Down
Loading

0 comments on commit 5e3f8ef

Please sign in to comment.