Skip to content

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda committed Aug 25, 2023
1 parent 63294d3 commit d02c2c8
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 82 deletions.
42 changes: 42 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[target.'cfg(all())']
rustflags = [
# Deny `unsafe`.
"-Dunsafe_code",
# Clippy groups.
"-Wclippy::pedantic",
"-Wclippy::cargo",
# Allowed Clippy lints.
"-Aclippy::tabs_in_doc_comments",
# Rustdoc group.
"-Wrustdoc::all",
# Rust groups.
"-Wfuture_incompatible",
"-Wrust_2018_compatibility",
"-Wrust_2018_idioms",
"-Wrust_2021_compatibility",
"-Wunused",
# Rust lints.
"-Wdeprecated_in_future",
"-Wffi_unwind_calls",
"-Winvalid-reference_casting",
"-Wmacro_use_extern_crate",
"-Wmeta_variable_misuse",
"-Wmissing_abi",
"-Wmissing_copy_implementations",
"-Wmissing_debug_implementations",
"-Wmissing_docs",
"-Wnon_ascii_idents",
"-Wnoop_method_call",
"-Wsingle_use_lifetimes",
"-Wtrivial_casts",
"-Wtrivial_numeric_casts",
"-Wunreachable_pub",
"-Wunsafe_op_in_unsafe_fn",
"-Wunused_crate_dependencies",
"-Wunused_import_braces",
"-Wunused_lifetimes",
"-Wunused_qualifications",
"-Wunused_tuple_struct_fields",
# Allowed Rust lints.
"-Aunused_crate_dependencies",
]
11 changes: 8 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ jobs:
- name: Run doc tests
run: |
rustup toolchain install nightly --profile minimal --allow-downgrade
cargo +nightly test --workspace --doc --all-features
cargo test --workspace --doc --all-features
msrv:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -63,6 +62,12 @@ jobs:
run: |
cargo clippy --workspace --all-targets --all-features -- -D warnings
- name: Run Rustdoc
run: |
cargo doc --no-deps --workspace --all-features
env:
RUSTDOCFLAGS: -D warnings

docs:
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
Expand All @@ -79,7 +84,7 @@ jobs:
rustup install nightly --profile minimal
cargo +nightly doc --no-deps --workspace --all-features
env:
RUSTDOCFLAGS: -D warnings
RUSTDOCFLAGS: --cfg docsrs

- name: Deploy Docs
uses: JamesIves/github-pages-deploy-action@releases/v4
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ flume = { version = "0.11" }
futures-channel = "0.3"
futures-executor = "0.3"
futures-util = "0.3"
if_chain = "1"
parking_lot = { version = "0.12", features = ["send_guard"] }
pin-project = "1"
quinn = "0.10.1"
rcgen = { version = "0.11.0", default-features = false, optional = true }
ring = "0.16"
rustls = { version = "0.21.1", default-features = false, features = [
"dangerous_configuration",
] }
Expand Down Expand Up @@ -65,4 +63,5 @@ lto = true

[package.metadata.docs.rs]
features = ["dangerous", "rcgen", "trust-dns"]
rustdoc-args = ["--cfg", "docsrs"]
targets = []
4 changes: 3 additions & 1 deletion examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! TODO

use anyhow::{Error, Result};
use fabruic::{Endpoint, KeyPair};
use futures_util::{future, StreamExt, TryFutureExt};
Expand Down Expand Up @@ -55,7 +57,7 @@ async fn main() -> Result<()> {
);

// send message
sender.send(&format!("hello from client {}", index))?;
sender.send(&format!("hello from client {index}"))?;

// start listening to new incoming messages
// in this example we know there is only 1 incoming message, so we will
Expand Down
98 changes: 98 additions & 0 deletions examples/onestream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! TODO

use anyhow::{Error, Result};
use fabruic::{Endpoint, KeyPair};
use futures_util::StreamExt;

const SERVER_NAME: &str = "test";
const SERVER_PORT: u16 = 5001;
const CLIENTS: usize = 100;

#[tokio::main]
#[cfg_attr(test, test)]
async fn main() -> Result<()> {
// generate a certificate pair
let key_pair = KeyPair::new_self_signed(SERVER_NAME);

// start the server
let server = Endpoint::new_server(SERVER_PORT, key_pair.clone())?;
let address = format!("quic://{}", server.local_address()?);
println!("[server] Listening on {address}");
tokio::spawn(run_server(server));

// build a client
let client = Endpoint::new_client()?;

let connection = client
.connect_pinned(address, key_pair.end_entity_certificate(), None)
.await?
.accept::<()>()
.await?;
connection.close_incoming().await?;

// initiate a stream
let (sender, receiver) = connection.open_stream::<String, String>(&()).await?;

let tasks = (0..CLIENTS)
.map(|_| {
let sender = sender.clone();
let mut receiver = receiver.clone();
async move {
sender.send(&String::from("test"))?;
let value = receiver.next().await.expect("didn't get a response")?;
assert_eq!(value, "test");
Ok(())
}
})
.collect::<Vec<_>>();

futures_util::future::join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()?;

// wait for client to finish cleanly
client.wait_idle().await;

Ok(())
}

async fn run_server(mut server: Endpoint) -> Result<(), Error> {
// start listening to new incoming connections
// in this example we know there is `CLIENTS` number of clients, so we will not
// wait for more
let mut connection = server
.next()
.await
.expect("connection failed")
.accept::<()>()
.await?;
println!("[server] New Connection: {}", connection.remote_address());

// start listening to new incoming streams
// in this example we know there is only 1 incoming stream, so we will not wait
// for more
let incoming = connection.next().await.expect("no stream found")?;
connection.close_incoming().await?;
println!(
"[server] New incoming stream from: {}",
connection.remote_address()
);

// accept stream
let (sender, mut receiver) = incoming.accept::<String, String>().await?;

// start listening to new incoming messages
// in this example we know there is only 1 incoming message, so we will not wait
// for more
while let Some(message) = receiver.next().await {
let message = message?;
sender.send(&message)?;
}

// wait for stream to finish
sender.finish().await?;
receiver.finish().await?;

Ok(())
}
113 changes: 113 additions & 0 deletions examples/twostream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! TODO

use anyhow::{Error, Result};
use fabruic::{Endpoint, Incoming, KeyPair};
use futures_util::StreamExt;

const SERVER_NAME: &str = "test";
const SERVER_PORT: u16 = 5002;
const REQUESTS_PER_STREAM: usize = 10;
const STREAMS: usize = 1000;

#[tokio::main(worker_threads = 16)]
#[cfg_attr(test, test)]
async fn main() -> Result<()> {
// generate a certificate pair
let key_pair = KeyPair::new_self_signed(SERVER_NAME);

// start the server
let server = Endpoint::new_server(SERVER_PORT, key_pair.clone())?;
let address = format!("quic://{}", server.local_address()?);
println!("[server] Listening on {address}");
tokio::spawn(run_server(server));

// build a client
let client = Endpoint::new_client()?;

let connection = client
.connect_pinned(address, key_pair.end_entity_certificate(), None)
.await?
.accept::<()>()
.await?;
connection.close_incoming().await?;

// initiate a stream

let tasks = (0..STREAMS)
.map(|_| async {
let (sender, receiver) = connection.open_stream::<String, String>(&()).await.unwrap();
(0..REQUESTS_PER_STREAM).for_each(|_| {
let sender = sender.clone();
let mut receiver = receiver.clone();
tokio::task::spawn(async move {
sender.send(&String::from("test"))?;
let value = receiver.next().await.expect("didn't get a response")?;
assert_eq!(value, "test");
Result::<(), Error>::Ok(())
});
});
Ok(())
})
.collect::<Vec<_>>();

futures_util::future::join_all(tasks)
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.unwrap();

// wait for client to finish cleanly
client.wait_idle().await;

Ok(())
}

async fn run_server(mut server: Endpoint) -> Result<(), Error> {
// start listening to new incoming connections
// in this example we know there is `CLIENTS` number of clients, so we will not
// wait for more
while let Some(connection) = server.next().await {
let connection = connection.accept::<()>().await?;
println!("[server] New Connection: {}", connection.remote_address());

// every new incoming connections is handled in it's own task
tokio::spawn(run_connection(connection));
}

Ok(())
}

async fn run_connection(mut connection: fabruic::Connection<()>) -> Result<(), Error> {
// start listening to new incoming streams
// in this example we know there is only 1 incoming stream, so we will not wait
// for more
while let Some(incoming) = connection.next().await {
// connection.close_incoming().await?;
/*println!(
"[server] New incoming stream from: {}",
connection.remote_address()
);*/

tokio::spawn(run_stream(incoming?));
}

Ok(())
}

async fn run_stream(incoming: Incoming<()>) -> Result<(), Error> {
let (sender, mut receiver) = incoming.accept::<String, String>().await?;

// start listening to new incoming messages
// in this example we know there is only 1 incoming message, so we will not wait
// for more
while let Some(message) = receiver.next().await {
let message = message?;
sender.send(&message)?;
}

// wait for stream to finish
sender.finish().await?;
receiver.finish().await?;

Ok(())
}
2 changes: 0 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use quinn::ConnectionClose;
pub use quinn::{ConnectError, ConnectionError, ReadError, WriteError};
use thiserror::Error;
#[cfg(feature = "trust-dns")]
#[cfg_attr(doc, doc(cfg(feature = "trust-dns")))]
pub use trust_dns_resolver::error::ResolveError;
pub use url::ParseError;
pub use webpki::Error;
Expand Down Expand Up @@ -161,7 +160,6 @@ pub enum Connect {
ParseDomain(ParseError),
/// Failed to resolve domain with [`trust-dns`](trust_dns_resolver).
#[cfg(feature = "trust-dns")]
#[cfg_attr(doc, doc(cfg(feature = "trust-dns")))]
#[error("Error resolving domain with trust-dns: {0}")]
TrustDns(#[from] Box<ResolveError>),
/// Failed to resolve domain with
Expand Down
Loading

0 comments on commit d02c2c8

Please sign in to comment.