Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support of a unix-like poll function #1046

Merged
merged 19 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 59 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ num-derive = "0.4"
num-traits = { version = "0.2", default-features = false }
pci-ids = { version = "0.2", optional = true }
pci_types = { version = "0.6" }
pflock = "0.2"
rand_chacha = { version = "0.3", default-features = false }
shell-words = { version = "1.1", default-features = false }
smallvec = { version = "1", features = ["const_new"] }
Expand All @@ -94,6 +93,8 @@ talc = { version = "4" }
time = { version = "0.3", default-features = false }
zerocopy = { version = "0.7", features = ["derive"] }
build-time = "0.1.3"
async-trait = "0.1.48"
async-lock = { version = "3.3.0", default-features = false }

[dependencies.smoltcp]
version = "0.11"
Expand Down
18 changes: 10 additions & 8 deletions src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hermit_sync::InterruptTicketMutex;

use crate::arch;

pub struct Console(());
pub(crate) struct Console(());

/// A collection of methods that are required to format
/// a message to Hermit's console.
Expand All @@ -21,14 +21,16 @@ impl fmt::Write for Console {
}
}

impl Console {
#[inline]
pub fn write_all(&mut self, buf: &[u8]) {
arch::output_message_buf(buf)
}
}

#[cfg(feature = "newlib")]
pub static CONSOLE: InterruptTicketMutex<Console> = InterruptTicketMutex::new(Console(()));
#[cfg(not(feature = "newlib"))]
static CONSOLE: InterruptTicketMutex<Console> = InterruptTicketMutex::new(Console(()));

#[doc(hidden)]
pub fn _print(args: ::core::fmt::Arguments<'_>) {
use core::fmt::Write;
CONSOLE.lock().write_fmt(args).unwrap();
}

#[cfg(all(test, not(target_os = "none")))]
mod tests {
Expand Down
214 changes: 214 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,34 @@ use alloc::task::Wake;
use core::future::Future;
use core::sync::atomic::AtomicU32;
use core::task::{Context, Poll, Waker};
use core::time::Duration;

use crossbeam_utils::Backoff;
use hermit_sync::without_interrupts;
#[cfg(any(feature = "tcp", feature = "udp"))]
use smoltcp::time::Instant;

use crate::arch::core_local::*;
#[cfg(all(
any(feature = "tcp", feature = "udp"),
not(feature = "pci"),
not(feature = "newlib")
))]
use crate::drivers::mmio::get_network_driver;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::drivers::net::NetworkDriver;
#[cfg(all(
any(feature = "tcp", feature = "udp"),
feature = "pci",
not(feature = "newlib")
))]
use crate::drivers::pci::get_network_driver;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::executor::network::network_delay;
use crate::executor::task::AsyncTask;
use crate::fd::IoError;
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
use crate::scheduler::PerCoreSchedulerExt;
use crate::synch::futex::*;

struct TaskNotify {
Expand Down Expand Up @@ -77,3 +100,194 @@ pub fn init() {
#[cfg(all(any(feature = "tcp", feature = "udp"), not(feature = "newlib")))]
crate::executor::network::init();
}

#[inline]
pub(crate) fn now() -> u64 {
crate::arch::kernel::systemtime::now_micros()
}

/// Blocks the current thread on `f`, running the executor when idling.
pub(crate) fn poll_on<F, T>(future: F, timeout: Option<Duration>) -> Result<T, IoError>
where
F: Future<Output = Result<T, IoError>>,
{
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
guard.get_checksums().tcp.tx()
} else {
true
};

let start = now();
let waker = core::task::Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
run();

if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return t;
}

if let Some(duration) = timeout {
if Duration::from_micros(now() - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let wakeup_time =
network_delay(Instant::from_micros_const(now().try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(wakeup_time);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return Err(IoError::ETIME);
}
}
}
}

/// Blocks the current thread on `f`, running the executor when idling.
pub(crate) fn block_on<F, T>(future: F, timeout: Option<Duration>) -> Result<T, IoError>
where
F: Future<Output = Result<T, IoError>>,
{
#[cfg(any(feature = "tcp", feature = "udp"))]
let nic = get_network_driver();

// disable network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
let no_retransmission = if let Some(nic) = nic {
let mut guard = nic.lock();
guard.set_polling_mode(true);
!guard.get_checksums().tcp.tx()
} else {
true
};

let backoff = Backoff::new();
let start = now();
let task_notify = Arc::new(TaskNotify::new());
let waker = task_notify.clone().into();
let mut cx = Context::from_waker(&waker);
let mut future = future;
let mut future = unsafe { core::pin::Pin::new_unchecked(&mut future) };

loop {
// run background tasks
run();

let now = now();
if let Poll::Ready(t) = future.as_mut().poll(&mut cx) {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return t;
}

if let Some(duration) = timeout {
if Duration::from_micros(now - start) >= duration {
#[cfg(any(feature = "tcp", feature = "udp"))]
if !no_retransmission {
let network_timer =
network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| crate::arch::processor::get_timer_ticks() + d.total_micros());
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
#[cfg(any(feature = "tcp", feature = "udp"))]
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

return Err(IoError::ETIME);
}
}

#[cfg(any(feature = "tcp", feature = "udp"))]
{
let delay = network_delay(Instant::from_micros_const(now.try_into().unwrap()))
.map(|d| d.total_micros());

if backoff.is_completed() && delay.unwrap_or(10_000_000) > 10_000 {
let wakeup_time =
timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());
if !no_retransmission {
let ticks = crate::arch::processor::get_timer_ticks();
let network_timer = delay.map(|d| ticks + d);
core_scheduler().add_network_timer(network_timer);
}

// allow network interrupts
if let Some(nic) = nic {
nic.lock().set_polling_mode(false);
}

// switch to another task
task_notify.wait(wakeup_time);

// restore default values
if let Some(nic) = nic {
nic.lock().set_polling_mode(true);
}
backoff.reset();
} else {
backoff.snooze();
}
}
#[cfg(not(any(feature = "tcp", feature = "udp")))]
{
if backoff.is_completed() {
let wakeup_time =
timeout.map(|duration| start + u64::try_from(duration.as_micros()).unwrap());

// switch to another task
task_notify.wait(wakeup_time);

// restore default values
backoff.reset();
} else {
backoff.snooze();
}
}
}
}
Loading