From 192d0661e9a59210ab5706c5fcf92e4e6f99c852 Mon Sep 17 00:00:00 2001 From: Gerd Zellweger Date: Wed, 4 Sep 2024 15:08:47 -0700 Subject: [PATCH] Change with_tokio_rt to accept Arc. This allows to share tokio runtimes across different sub-systems inside your application. Signed-off-by: Gerd Zellweger --- actix-rt/examples/multi_thread_system.rs | 1 + actix-rt/src/arbiter.rs | 2 +- actix-rt/src/runtime.rs | 18 +++++++++++++----- actix-rt/src/system.rs | 6 +++--- actix-rt/tests/tests.rs | 4 +++- actix-server/src/worker.rs | 1 + 6 files changed, 22 insertions(+), 10 deletions(-) diff --git a/actix-rt/examples/multi_thread_system.rs b/actix-rt/examples/multi_thread_system.rs index 0ecd1ef1a0..ae8c262404 100644 --- a/actix-rt/examples/multi_thread_system.rs +++ b/actix-rt/examples/multi_thread_system.rs @@ -10,6 +10,7 @@ fn main() { .worker_threads(2) .enable_all() .build() + .map(std::sync::Arc::new) .unwrap() }) .block_on(async_main()); diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 1da76c5255..b9c221aa87 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -109,7 +109,7 @@ impl Arbiter { #[cfg(not(all(target_os = "linux", feature = "io-uring")))] pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where - F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + F: FnOnce() -> std::sync::Arc + Send + 'static, { let sys = System::current(); let system_id = sys.id(); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 55e29a777d..bc12947c95 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,5 +1,4 @@ -use std::{future::Future, io}; - +use std::{sync::Arc, future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; /// A Tokio-based runtime proxy. @@ -9,14 +8,14 @@ use tokio::task::{JoinHandle, LocalSet}; #[derive(Debug)] pub struct Runtime { local: LocalSet, - rt: tokio::runtime::Runtime, + rt: Arc, } -pub(crate) fn default_tokio_runtime() -> io::Result { +pub(crate) fn default_tokio_runtime() -> io::Result> { tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() - .build() + .build().map(Arc::new) } impl Runtime { @@ -141,6 +140,15 @@ impl Runtime { impl From for Runtime { fn from(rt: tokio::runtime::Runtime) -> Self { + Self { + local: LocalSet::new(), + rt: Arc::new(rt), + } + } +} + +impl From> for Runtime { + fn from(rt: Arc) -> Self { Self { local: LocalSet::new(), rt, diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index f9fe4c8dc1..f75e2e5f02 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -5,9 +5,9 @@ use std::{ io, pin::Pin, sync::atomic::{AtomicUsize, Ordering}, + sync::Arc, task::{Context, Poll}, }; - use futures_core::ready; use tokio::sync::{mpsc, oneshot}; @@ -48,7 +48,7 @@ impl System { /// [tokio-runtime]: tokio::runtime::Runtime pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner where - F: FnOnce() -> tokio::runtime::Runtime, + F: FnOnce() -> Arc, { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); @@ -87,7 +87,7 @@ impl System { #[doc(hidden)] pub fn with_tokio_rt(_: F) -> SystemRunner where - F: FnOnce() -> tokio::runtime::Runtime, + F: FnOnce() -> Arc, { unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet") } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 330e27ff94..272e50c3d1 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -6,7 +6,7 @@ use std::{ use actix_rt::{task::JoinError, Arbiter, System}; #[cfg(not(feature = "io-uring"))] use { - std::{sync::mpsc::channel, thread}, + std::{sync::Arc, sync::mpsc::channel, thread}, tokio::sync::oneshot, }; @@ -250,6 +250,7 @@ fn new_system_with_tokio() { .on_thread_start(|| {}) .on_thread_stop(|| {}) .build() + .map(Arc::new) .unwrap() }) .block_on(async { @@ -282,6 +283,7 @@ fn new_arbiter_with_tokio() { tokio::runtime::Builder::new_current_thread() .enable_all() .build() + .map(Arc::new) .unwrap() }); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7050fcd257..4a5a0f1632 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -426,6 +426,7 @@ impl ServerWorker { .enable_all() .max_blocking_threads(config.max_blocking_threads) .build() + .map(Arc::new) .unwrap() }) };