Skip to content

Commit

Permalink
Merge branch 'develop' into jsdw-release-v0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdw committed Jun 15, 2021
2 parents f6a1072 + 3bdc666 commit c85da2f
Show file tree
Hide file tree
Showing 14 changed files with 2,539 additions and 2,479 deletions.
114 changes: 114 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
name: Rust

on:
push:
# Run jobs when commits are pushed to
# develop or release-like branches:
branches:
- develop
- release*
pull_request:
# Run jobs for any PR that wants to merge
# to develop:
branches:
- develop

env:
CARGO_TERM_COLOR: always

jobs:
build:
name: Check Code
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/[email protected]

- name: Install Rust stable toolchain
uses: actions-rs/[email protected]
with:
profile: minimal
toolchain: stable
override: true

- name: Rust Cache
uses: Swatinem/[email protected]

- name: Build
uses: actions-rs/[email protected]
with:
command: check
args: --all-targets

fmt:
name: Run rustfmt
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/[email protected]

- name: Install Rust stable toolchain
uses: actions-rs/[email protected]
with:
profile: minimal
toolchain: stable
override: true
components: clippy, rustfmt

- name: Rust Cache
uses: Swatinem/[email protected]

- name: Cargo fmt
uses: actions-rs/[email protected]
with:
command: fmt
args: --all -- --check

docs:
name: Check Documentation
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/[email protected]

- name: Install Rust stable toolchain
uses: actions-rs/[email protected]
with:
profile: minimal
toolchain: stable
override: true

- name: Rust Cache
uses: Swatinem/[email protected]

- name: Check internal documentation links
run: RUSTDOCFLAGS="--deny broken_intra_doc_links" cargo doc --verbose --workspace --no-deps --document-private-items

tests:
name: Run tests
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/[email protected]

- name: Install Rust stable toolchain
uses: actions-rs/[email protected]
with:
profile: minimal
toolchain: stable
override: true

- name: Rust Cache
uses: Swatinem/[email protected]

- name: Cargo build
uses: actions-rs/[email protected]
with:
command: build
args: --workspace

- name: Cargo test
uses: actions-rs/[email protected]
with:
command: test

20 changes: 0 additions & 20 deletions .github/workflows/rust.yml

This file was deleted.

112 changes: 56 additions & 56 deletions examples/autobahn_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// See https://github.com/crossbario/autobahn-testsuite for details.

use futures::io::{BufReader, BufWriter};
use soketto::{BoxedError, connection, handshake};
use soketto::{connection, handshake, BoxedError};
use std::str::FromStr;
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
Expand All @@ -24,76 +24,76 @@ const SOKETTO_VERSION: &str = env!("CARGO_PKG_VERSION");

#[tokio::main]
async fn main() -> Result<(), BoxedError> {
let n = num_of_cases().await?;
for i in 1 ..= n {
if let Err(e) = run_case(i).await {
log::error!("case {}: {:?}", i, e)
}
}
update_report().await?;
Ok(())
let n = num_of_cases().await?;
for i in 1..=n {
if let Err(e) = run_case(i).await {
log::error!("case {}: {:?}", i, e)
}
}
update_report().await?;
Ok(())
}

async fn num_of_cases() -> Result<usize, BoxedError> {
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, "/getCaseCount");
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
let (_, mut receiver) = client.into_builder().finish();
let mut data = Vec::new();
let kind = receiver.receive_data(&mut data).await?;
assert!(kind.is_text());
let num = usize::from_str(std::str::from_utf8(&data)?)?;
log::info!("{} cases to run", num);
Ok(num)
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, "/getCaseCount");
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted { .. }));
let (_, mut receiver) = client.into_builder().finish();
let mut data = Vec::new();
let kind = receiver.receive_data(&mut data).await?;
assert!(kind.is_text());
let num = usize::from_str(std::str::from_utf8(&data)?)?;
log::info!("{} cases to run", num);
Ok(num)
}

async fn run_case(n: usize) -> Result<(), BoxedError> {
log::info!("running case {}", n);
let resource = format!("/runCase?case={}&agent=soketto-{}", n, SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
let (mut sender, mut receiver) = client.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
sender.send_text(std::str::from_utf8(&message)?).await?;
sender.flush().await?
}
Err(connection::Error::Closed) => return Ok(()),
Err(e) => return Err(e.into())
}
}
log::info!("running case {}", n);
let resource = format!("/runCase?case={}&agent=soketto-{}", n, SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted { .. }));
let (mut sender, mut receiver) = client.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
sender.send_text(std::str::from_utf8(&message)?).await?;
sender.flush().await?
}
Err(connection::Error::Closed) => return Ok(()),
Err(e) => return Err(e.into()),
}
}
}

async fn update_report() -> Result<(), BoxedError> {
log::info!("requesting report generation");
let resource = format!("/updateReports?agent=soketto-{}", SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted {..}));
client.into_builder().finish().0.close().await?;
Ok(())
log::info!("requesting report generation");
let resource = format!("/updateReports?agent=soketto-{}", SOKETTO_VERSION);
let socket = TcpStream::connect("127.0.0.1:9001").await?;
let mut client = new_client(socket, &resource);
assert!(matches!(client.handshake().await?, handshake::ServerResponse::Accepted { .. }));
client.into_builder().finish().0.close().await?;
Ok(())
}

#[cfg(not(feature = "deflate"))]
fn new_client(socket: TcpStream, path: &str) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Client::new(BufReader::new(BufWriter::new(socket.compat())), "127.0.0.1:9001", path)
handshake::Client::new(BufReader::new(BufWriter::new(socket.compat())), "127.0.0.1:9001", path)
}

#[cfg(feature = "deflate")]
fn new_client(socket: TcpStream, path: &str) -> handshake::Client<'_, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(64 * 1024, socket.compat()));
let mut client = handshake::Client::new(socket, "127.0.0.1:9001", path);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Client);
client.add_extension(Box::new(deflate));
client
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(64 * 1024, socket.compat()));
let mut client = handshake::Client::new(socket, "127.0.0.1:9001", path);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Client);
client.add_extension(Box::new(deflate));
client
}
92 changes: 46 additions & 46 deletions examples/autobahn_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,62 @@
// See https://github.com/crossbario/autobahn-testsuite for details.

use futures::io::{BufReader, BufWriter};
use soketto::{BoxedError, connection, handshake};
use soketto::{connection, handshake, BoxedError};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
#[tokio::main]
async fn main() -> Result<(), BoxedError> {
let listener = TcpListener::bind("127.0.0.1:9001").await?;
let mut incoming = TcpListenerStream::new(listener);
while let Some(socket) = incoming.next().await {
let mut server = new_server(socket?);
let key = {
let req = server.receive_request().await?;
req.key()
};
let accept = handshake::server::Response::Accept { key, protocol: None };
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
if let Ok(txt) = std::str::from_utf8(&message) {
sender.send_text(txt).await?;
sender.flush().await?
} else {
break
}
}
Err(connection::Error::Closed) => break,
Err(e) => {
log::error!("connection error: {}", e);
break
}
}
}
}
Ok(())
let listener = TcpListener::bind("127.0.0.1:9001").await?;
let mut incoming = TcpListenerStream::new(listener);
while let Some(socket) = incoming.next().await {
let mut server = new_server(socket?);
let key = {
let req = server.receive_request().await?;
req.key()
};
let accept = handshake::server::Response::Accept { key, protocol: None };
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await?;
sender.flush().await?
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
if let Ok(txt) = std::str::from_utf8(&message) {
sender.send_text(txt).await?;
sender.flush().await?
} else {
break;
}
}
Err(connection::Error::Closed) => break,
Err(e) => {
log::error!("connection error: {}", e);
break;
}
}
}
}
Ok(())
}

#[cfg(not(feature = "deflate"))]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
}

#[cfg(feature = "deflate")]
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(16 * 1024, socket.compat()));
let mut server = handshake::Server::new(socket);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
server.add_extension(Box::new(deflate));
server
let socket = BufReader::with_capacity(8 * 1024, BufWriter::with_capacity(16 * 1024, socket.compat()));
let mut server = handshake::Server::new(socket);
let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
server.add_extension(Box::new(deflate));
server
}
4 changes: 4 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
hard_tabs = true
max_width = 120
use_small_heuristics = "Max"
edition = "2018"
Loading

0 comments on commit c85da2f

Please sign in to comment.