Skip to content

Commit

Permalink
fix sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Nov 27, 2024
1 parent e1d4474 commit 11a08da
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 109 deletions.
4 changes: 2 additions & 2 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ r = "run"
xli = "run --bin xmtp_cli --"
xdbg = "run --release --bin xdbg --"

[build]
rustflags = ["--cfg", "tracing_unstable"]
# [build]
# rustflags = ["--cfg", "tracing_unstable"]
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ members = [
"bindings_node",
"bindings_ffi",
"xtask",
"xmtp_debug"
, "common"]
"xmtp_debug",
"common"]

# Make the feature resolver explicit.
# See https://doc.rust-lang.org/edition-guide/rust-2021/default-cargo-resolver.html#details
Expand Down
7 changes: 4 additions & 3 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ web-time.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["time"] }
rand = "0.8"
futures.workspace = true
xmtp_cryptography.workspace = true

parking_lot = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter", "ansi"], optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { workspace = true, features = ["js"] }
gloo-timers = { workspace = true, features = ["futures"] }
tracing-web = { version = "0.1", optional = true }
tracing-wasm = { version = "0.2", optional = true }
console_error_panic_hook = { version = "0.1", optional = true }

[dev-dependencies]
Expand All @@ -31,4 +32,4 @@ wasm-bindgen-test.workspace = true
tokio = { workspace = true, features = ["time", "macros", "rt-multi-thread", "sync"]}

[features]
test-utils = ["dep:parking_lot", "dep:futures", "dep:tracing-subscriber", "dep:tracing-web", "dep:console_error_panic_hook"]
test-utils = ["dep:parking_lot", "dep:tracing-subscriber", "dep:tracing-wasm", "dep:console_error_panic_hook"]
34 changes: 11 additions & 23 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,18 @@ pub trait Wasm {}
#[cfg(target_arch = "wasm32")]
impl<T> Wasm for T {}

pub mod time {
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
pub use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
pub use web_time::{Duration, Instant, SystemTime, UNIX_EPOCH};

pub fn now_ns() -> i64 {
let now = SystemTime::now();

now.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_nanos() as i64
}
}
pub mod time;

// WASM Shims
#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
pub async fn sleep(duration: core::time::Duration) {
gloo_timers::future::TimeoutFuture::new(duration.as_millis() as u32).await;

use xmtp_cryptography::utils as crypto_utils;
use rand::{distributions::{Alphanumeric, DistString}, RngCore};
pub fn rand_string<const N: usize>() -> String {
Alphanumeric.sample_string(&mut crypto_utils::rng(), N)
}

#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
pub async fn sleep(duration: core::time::Duration) {
tokio::time::sleep(duration).await
pub fn rand_array<const N: usize>() -> [u8; N] {
let mut buffer = [0u8; N];
crypto_utils::rng().fill_bytes(&mut buffer);
buffer
}

6 changes: 3 additions & 3 deletions common/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ macro_rules! retry_async {
e.to_string()
);
attempts += 1;
$crate::sleep($retry.duration(attempts)).await;
$crate::time::sleep($retry.duration(attempts)).await;
} else {
tracing::info!("error is not retryable. {:?}:{}", e, e);
break Err(e);
Expand Down Expand Up @@ -308,7 +308,7 @@ pub(crate) mod tests {
return Ok(());
}
// do some work
crate::sleep(core::time::Duration::from_nanos(100)).await;
crate::time::sleep(core::time::Duration::from_nanos(100)).await;
Err(SomeError::ARetryableError)
}

Expand All @@ -334,7 +334,7 @@ pub(crate) mod tests {
}
*data += 1;
// do some work
crate::sleep(core::time::Duration::from_nanos(100)).await;
crate::time::sleep(core::time::Duration::from_nanos(100)).await;
Err(SomeError::ARetryableError)
}

Expand Down
37 changes: 11 additions & 26 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
use rand::{
distributions::{Alphanumeric, DistString},
seq::IteratorRandom,
Rng, RngCore,
Rng,
};
use std::sync::OnceLock;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use xmtp_cryptography::utils as crypto_utils;

#[cfg(not(target_arch = "wasm32"))]
pub mod traced_test;
Expand Down Expand Up @@ -35,67 +36,51 @@ pub fn logger() {
/// A simple test logger that defaults to the INFO level
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
pub fn logger() {
use tracing_subscriber::fmt::format::Pretty;
INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.with_default_directive(tracing::metadata::LevelFilter::DEBUG.into())
.from_env_lossy();

let fmt = tracing_subscriber::fmt::layer()
.with_ansi(true) // not supported by all browsers
.without_time() // std::time break things, but chrono might work
.with_writer(tracing_web::MakeWebConsoleWriter::new());

tracing_subscriber::registry()
.with(fmt)
.with(tracing_wasm::WASMLayer::default())
.with(filter)
.with(tracing_web::performance_layer().with_details_from_fields(Pretty::default()))
.init();

console_error_panic_hook::set_once();
});
}

pub fn rand_string<const N: usize>() -> String {
Alphanumeric.sample_string(&mut rand::thread_rng(), N)
}

pub fn rand_hexstring() -> String {
let mut rng = crypto_utils::rng();
let hex_chars = "0123456789abcdef";
let v: String = (0..40)
.map(|_| hex_chars.chars().choose(&mut rand::thread_rng()).unwrap())
.map(|_| hex_chars.chars().choose(&mut rng).unwrap())
.collect();

format!("0x{}", v)
}

pub fn rand_account_address() -> String {
Alphanumeric.sample_string(&mut rand::thread_rng(), 42)
Alphanumeric.sample_string(&mut crypto_utils::rng(), 42)
}

pub fn rand_vec<const N: usize>() -> Vec<u8> {
rand_array::<N>().to_vec()
}

pub fn rand_array<const N: usize>() -> [u8; N] {
let mut buffer = [0u8; N];
rand::thread_rng().fill_bytes(&mut buffer);
buffer
crate::rand_array::<N>().to_vec()
}

pub fn rand_u64() -> u64 {
rand::thread_rng().gen()
crypto_utils::rng().gen()
}

#[cfg(not(target_arch = "wasm32"))]
pub fn tmp_path() -> String {
let db_name = rand_string::<24>();
let db_name = crate::rand_string::<24>();
format!("{}/{}.db3", std::env::temp_dir().to_str().unwrap(), db_name)
}

#[cfg(target_arch = "wasm32")]
pub fn tmp_path() -> String {
let db_name = rand_string::<24>();
let db_name = crate::rand_string::<24>();
format!("{}/{}.db3", "test_db", db_name)
}

Expand Down
74 changes: 74 additions & 0 deletions common/src/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! Time primitives for native and WebAssembly
use std::fmt;

#[derive(Debug)]
pub struct Expired;

impl std::error::Error for Expired {
fn description(&self) -> &str {
"Timer duration expired"
}
}

#[cfg(not(target_arch = "wasm32"))]
impl From<tokio::time::error::Elapsed> for Expired {
fn from(_: tokio::time::error::Elapsed) -> Expired {
Expired
}
}

impl fmt::Display for Expired {
fn fmt(&self, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "timer duration expired")
}
}

#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
pub use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
pub use web_time::{Duration, Instant, SystemTime, UNIX_EPOCH};

pub fn now_ns() -> i64 {
let now = SystemTime::now();

now.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_nanos() as i64
}

#[cfg(target_arch = "wasm32")]
pub async fn timeout<F>(duration: Duration, future: F) -> Result<F::Output, Expired>
where
F: std::future::IntoFuture,
{
use futures::future::Either::*;
let timeout = gloo_timers::future::TimeoutFuture::new(duration.as_millis() as u32);
let future = future.into_future();
futures::pin_mut!(future);
match futures::future::select(timeout, future).await {
Left(_) => Err(Expired),
Right((value, _)) => Ok(value),
}
}

#[cfg(not(target_arch = "wasm32"))]
pub async fn timeout<F>(duration: Duration, future: F) -> Result<F::Output, Expired>
where
F: std::future::IntoFuture,
{
tokio::time::timeout(duration, future).await.map_err(Into::into)
}

// WASM Shims
#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
pub async fn sleep(duration: Duration) {
gloo_timers::future::TimeoutFuture::new(duration.as_millis() as u32).await;
}

#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await
}
3 changes: 2 additions & 1 deletion xmtp_api_http/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn create_grpc_stream_inner<
.send()
.await
.map_err(|e| Error::new(ErrorKind::MlsError).with(e))?;

tracing::debug!("Got Request, getting byte stream");
let mut remaining = vec![];
for await bytes in request.bytes_stream() {
let bytes = bytes
Expand All @@ -92,6 +92,7 @@ pub fn create_grpc_stream_inner<
let de = Deserializer::from_slice(bytes);
let mut stream = de.into_iter::<GrpcResponse<R>>();
'messages: loop {
tracing::debug!("Waiting on next response ...");
let response = stream.next();
let res = match response {
Some(Ok(GrpcResponse::Ok(response))) => Ok(response),
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/groups/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ pub(crate) mod tests {
// just to make sure stream is started
let _ = start_rx.await;
// Adding in a sleep, since the HTTP API client may acknowledge requests before they are ready
xmtp_common::sleep(core::time::Duration::from_millis(100)).await;
xmtp_common::time::sleep(core::time::Duration::from_millis(100)).await;

amal_group
.add_members_by_inbox_id(&[bola.inbox_id()])
Expand Down
Loading

0 comments on commit 11a08da

Please sign in to comment.