Skip to content

Commit

Permalink
remember worker_count passed from configuration
Browse files Browse the repository at this point in the history
tokio's metric APIs is not stablized yet:
tokio-rs/tokio#4073
  • Loading branch information
zonyitoo committed Apr 24, 2022
1 parent 9e13020 commit f059d4a
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ libc = "0.2"

futures = "0.3"
tokio = { version = "1", features = ["rt", "signal"] }
num_cpus = "1.13"

ipnet = { version = "2.3", optional = true }

Expand Down
1 change: 0 additions & 1 deletion crates/shadowsocks-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ async-trait = "0.1"

socket2 = { version = "0.4", features = ["all"] }
libc = "0.2"
num_cpus = "1.13"

hyper = { version = "0.14.16", optional = true, features = ["full"] }
tower = { version = "0.4", optional = true }
Expand Down
7 changes: 7 additions & 0 deletions crates/shadowsocks-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,11 @@ pub struct Config {
/// Configuration file path, the actual path of the configuration.
/// This is normally for auto-reloading if implementation supports.
pub config_path: Option<PathBuf>,

#[doc(hidden)]
/// Workers in runtime
/// It should be replaced with metrics APIs: https://github.com/tokio-rs/tokio/issues/4073
pub worker_count: usize,
}

/// Configuration parsing error kind
Expand Down Expand Up @@ -1192,6 +1197,8 @@ impl Config {
balancer: BalancerConfig::default(),

config_path: None,

worker_count: 1,
}
}

Expand Down
12 changes: 12 additions & 0 deletions crates/shadowsocks-service/src/manager/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct Manager {
acl: Option<Arc<AccessControl>>,
ipv6_first: bool,
security: SecurityConfig,
worker_count: usize,
}

impl Manager {
Expand All @@ -103,6 +104,7 @@ impl Manager {
acl: None,
ipv6_first: false,
security: SecurityConfig::default(),
worker_count: 1,
}
}

Expand Down Expand Up @@ -152,6 +154,14 @@ impl Manager {
self.security = security;
}

/// Set runtime worker count
///
/// Should be replaced with tokio's metric API when it is stablized.
/// https://github.com/tokio-rs/tokio/issues/4073
pub fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}

/// Start serving
pub async fn run(self) -> io::Result<()> {
let mut listener = ManagerListener::bind(&self.context, &self.svr_cfg.addr).await?;
Expand Down Expand Up @@ -235,6 +245,8 @@ impl Manager {

server.set_security_config(&self.security);

server.set_worker_count(self.worker_count);

let server_port = server.config().addr().port();

let mut servers = self.servers.lock().await;
Expand Down
4 changes: 4 additions & 0 deletions crates/shadowsocks-service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ pub async fn run(config: Config) -> io::Result<()> {
server.set_ipv6_first(config.ipv6_first);
}

if config.worker_count >= 1 {
server.set_worker_count(config.worker_count);
}

server.set_security_config(&config.security);

servers.push(server);
Expand Down
13 changes: 12 additions & 1 deletion crates/shadowsocks-service/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Server {
udp_capacity: Option<usize>,
manager_addr: Option<ManagerAddr>,
accept_opts: AcceptOpts,
worker_count: usize,
}

impl Server {
Expand All @@ -47,6 +48,7 @@ impl Server {
udp_capacity: None,
manager_addr: None,
accept_opts: AcceptOpts::default(),
worker_count: 1,
}
}

Expand Down Expand Up @@ -81,6 +83,14 @@ impl Server {
self.manager_addr = Some(manager_addr);
}

/// Set runtime worker count
///
/// Should be replaced with tokio's metric API when it is stablized.
/// https://github.com/tokio-rs/tokio/issues/4073
pub fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}

/// Get server's configuration
pub fn config(&self) -> &ServerConfig {
&self.svr_cfg
Expand Down Expand Up @@ -169,13 +179,14 @@ impl Server {
}

async fn run_udp_server(&self) -> io::Result<()> {
let server = UdpServer::new(
let mut server = UdpServer::new(
self.context.clone(),
self.svr_cfg.method(),
self.udp_expiry_duration,
self.udp_capacity,
self.accept_opts.clone(),
);
server.set_worker_count(self.worker_count);
server.run(&self.svr_cfg).await
}

Expand Down
140 changes: 66 additions & 74 deletions crates/shadowsocks-service/src/server/udprelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct UdpServer {
keepalive_rx: mpsc::Receiver<NatKey>,
time_to_live: Duration,
accept_opts: AcceptOpts,
worker_count: usize,
}

impl UdpServer {
Expand Down Expand Up @@ -127,9 +128,15 @@ impl UdpServer {
keepalive_rx,
time_to_live,
accept_opts,
worker_count: 1,
}
}

#[inline]
pub fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}

pub async fn run(mut self, svr_cfg: &ServerConfig) -> io::Result<()> {
let socket = ProxySocket::bind_with_opts(self.context.context(), svr_cfg, self.accept_opts.clone()).await?;

Expand All @@ -145,63 +152,35 @@ impl UdpServer {

let mut orx_opt = None;

let cpus = num_cpus::get();
let mut other_cores = Vec::new();
let cpus = self.worker_count;
let mut other_receivers = Vec::new();
if cpus > 1 {
let (otx, orx) = mpsc::channel(64);
let (otx, orx) = mpsc::channel((cpus - 1) * 16);
orx_opt = Some(orx);

other_cores.reserve(cpus - 1);
other_receivers.reserve(cpus - 1);
trace!("udp server starting extra {} recv workers", cpus - 1);

for _ in 1..cpus {
let otx = otx.clone();
let listener = listener.clone();
let context = self.context.clone();

other_cores.push(tokio::spawn(async move {
other_receivers.push(tokio::spawn(async move {
let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];

loop {
let (n, peer_addr, target_addr, control) = match listener.recv_from_with_ctrl(&mut buffer).await
{
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
continue;
}
};

if n == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}

if context.check_client_blocked(&peer_addr) {
warn!(
"udp client {} outbound {} access denied by ACL rules",
peer_addr, target_addr
);
continue;
}
let (n, peer_addr, target_addr, control) =
match UdpServer::recv_one_packet(&context, &listener, &mut buffer).await {
Some(s) => s,
None => continue,
};

if context.check_outbound_blocked(&target_addr).await {
warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
continue;
}

let r = otx
if let Err(..) = otx
.send((peer_addr, target_addr, control, Bytes::copy_from_slice(&buffer[..n])))
.await;

// If Result is error, the channel receiver is closed. We should exit the task.
if r.is_err() {
.await
{
// If Result is error, the channel receiver is closed. We should exit the task.
break;
}
}
Expand All @@ -222,7 +201,7 @@ impl UdpServer {
}

let _guard = MulticoreTaskGuard {
tasks: &mut other_cores,
tasks: &mut other_receivers,
};

#[inline]
Expand Down Expand Up @@ -251,39 +230,12 @@ impl UdpServer {
self.assoc_map.keep_alive(&peer_addr);
}

recv_result = listener.recv_from_with_ctrl(&mut buffer) => {
recv_result = UdpServer::recv_one_packet(&self.context, &listener, &mut buffer) => {
let (n, peer_addr, target_addr, control) = match recv_result {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
continue;
}
Some(s) => s,
None => continue,
};

if n == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}

if self.context.check_client_blocked(&peer_addr) {
warn!(
"udp client {} outbound {} access denied by ACL rules",
peer_addr, target_addr
);
continue;
}

if self.context.check_outbound_blocked(&target_addr).await {
warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
continue;
}

let data = &buffer[..n];
if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, Bytes::copy_from_slice(data)).await {
debug!(
Expand All @@ -295,7 +247,7 @@ impl UdpServer {
}
}

recv_result = multicore_recv(&mut orx_opt) => {
recv_result = multicore_recv(&mut orx_opt), if orx_opt.is_some() => {
let (peer_addr, target_addr, control, data) = recv_result;
let data_len = data.len();
if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, control, data).await {
Expand All @@ -311,6 +263,46 @@ impl UdpServer {
}
}

async fn recv_one_packet(
context: &ServiceContext,
l: &MonProxySocket,
buffer: &mut [u8],
) -> Option<(usize, SocketAddr, Address, Option<UdpSocketControlData>)> {
let (n, peer_addr, target_addr, control) = match l.recv_from_with_ctrl(buffer).await {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
return None;
}
};

if n == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
return None;
}

if context.check_client_blocked(&peer_addr) {
warn!(
"udp client {} outbound {} access denied by ACL rules",
peer_addr, target_addr
);
return None;
}

if context.check_outbound_blocked(&target_addr).await {
warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
return None;
}

Some((n, peer_addr, target_addr, control))
}

async fn send_packet(
&mut self,
listener: &Arc<MonProxySocket>,
Expand Down
5 changes: 5 additions & 0 deletions src/service/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,23 @@ pub fn main(matches: &ArgMatches) {

info!("shadowsocks manager {} build {}", crate::VERSION, crate::BUILD_TIME);

let mut worker_count = 1;
let mut builder = match service_config.runtime.mode {
RuntimeMode::SingleThread => Builder::new_current_thread(),
#[cfg(feature = "multi-threaded")]
RuntimeMode::MultiThread => {
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = service_config.runtime.worker_count {
worker_count = worker_threads;
builder.worker_threads(worker_threads);
} else {
worker_count = num_cpus::get();
}

builder
}
};
config.worker_count = worker_count;

let runtime = builder.enable_all().build().expect("create tokio Runtime");

Expand Down
Loading

0 comments on commit f059d4a

Please sign in to comment.