Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows Support for Sync implementation #172

Merged
merged 4 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/bvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand All @@ -22,7 +22,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ nix = "0.23.0"
log = "0.4"
byteorder = "1.3.2"
thiserror = "1.0"

async-trait = { version = "0.1.31", optional = true }
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
futures = { version = "0.3", optional = true }

[target.'cfg(windows)'.dependencies]
windows-sys = {version = "0.45", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
tokio-vsock = { version = "0.3.1", optional = true }

[build-dependencies]
protobuf-codegen = "3.1.0"

[dev-dependencies]
assert_cmd = "2.0.7"

[features]
default = ["sync"]
async = ["async-trait", "tokio", "futures", "tokio-vsock"]
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ build: debug

.PHONY: test
test:
ifeq ($OS,Windows_NT)
jsturtevant marked this conversation as resolved.
Show resolved Hide resolved
# async isn't enabled for windows, don't test that feature
cargo test --verbose
else
cargo test --all-features --verbose

endif

.PHONY: check
check:
cargo fmt --all -- --check
Expand Down
9 changes: 8 additions & 1 deletion example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let c = Client::connect(utils::SOCK_ADDR).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ use std::sync::Arc;

use log::LevelFilter;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct HealthService;

#[cfg(unix)]
#[async_trait]
impl health_ttrpc::Health for HealthService {
async fn check(
Expand Down Expand Up @@ -58,7 +63,7 @@ impl health_ttrpc::Health for HealthService {
}

struct AgentService;

#[cfg(unix)]
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
async fn list_interfaces(
Expand All @@ -82,6 +87,12 @@ impl agent_ttrpc::AgentService for AgentService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);
Expand Down
15 changes: 14 additions & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(log::LevelFilter::Info);
Expand Down Expand Up @@ -48,6 +55,7 @@ fn default_ctx() -> Context {
ctx
}

#[cfg(unix)]
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
let echo1 = streaming::EchoPayload {
seq: 1,
Expand All @@ -59,6 +67,7 @@ async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(resp.seq, echo1.seq + 1);
}

#[cfg(unix)]
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

Expand All @@ -81,6 +90,7 @@ async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
}

#[cfg(unix)]
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();

Expand Down Expand Up @@ -108,6 +118,7 @@ async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(ssum.num, sum.num);
}

#[cfg(unix)]
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
let expected = streaming::Sum {
sum: 392,
Expand All @@ -127,6 +138,7 @@ async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(actual.num, expected.num);
}

#[cfg(unix)]
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_null(default_ctx()).await.unwrap();

Expand All @@ -142,6 +154,7 @@ async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(res, empty::Empty::new());
}

#[cfg(unix)]
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();

Expand Down
11 changes: 11 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ use std::sync::Arc;

use log::{info, LevelFilter};

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
#[cfg(unix)]
use ttrpc::asynchronous::Server;

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct StreamingService;

#[cfg(unix)]
#[async_trait]
impl streaming_ttrpc::Streaming for StreamingService {
async fn echo(
Expand Down Expand Up @@ -131,6 +136,12 @@ impl streaming_ttrpc::Streaming for StreamingService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Info);
Expand Down
4 changes: 2 additions & 2 deletions example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {
async_all: true,
..Default::default()
})
.rust_protobuf_customize(protobuf_customized.clone())
.rust_protobuf_customize(protobuf_customized)
.run()
.expect("Gen async code failed.");

Expand Down Expand Up @@ -75,7 +75,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std

let new_contents = contents.replace(from, to);

let mut dst = File::create(&file_name)?;
let mut dst = File::create(file_name)?;
dst.write(new_contents.as_bytes())?;

Ok(())
Expand Down
77 changes: 55 additions & 22 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
mod protocols;
mod utils;

use log::LevelFilter;
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
use std::thread;
use ttrpc::context::{self, Context};
use ttrpc::error::Error;
use ttrpc::proto::Code;
use ttrpc::Client;

fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let c = Client::connect(utils::SOCK_ADDR).unwrap();
let hc = health_ttrpc::HealthClient::new(c.clone());
let ac = agent_ttrpc::AgentServiceClient::new(c);
Expand All @@ -33,69 +38,97 @@ fn main() {
let t = thread::spawn(move || {
let req = health::CheckRequest::new();
println!(
"OS Thread {:?} - {} started: {:?}",
"OS Thread {:?} - health.check() started: {:?}",
std::thread::current().id(),
"health.check()",
now.elapsed(),
);

let rsp = thc.check(default_ctx(), &req);
match rsp.as_ref() {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!("Just for fun".to_string(), s.message())
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(x) => {
panic!("not expecting a OK response from the example server: {:?}", x)
}
}
println!(
"OS Thread {:?} - {} -> {:?} ended: {:?}",
"OS Thread {:?} - health.check() -> {:?} ended: {:?}",
std::thread::current().id(),
"health.check()",
thc.check(default_ctx(), &req),
rsp,
now.elapsed(),
);
});

let t2 = thread::spawn(move || {
println!(
"OS Thread {:?} - {} started: {:?}",
"OS Thread {:?} - agent.list_interfaces() started: {:?}",
std::thread::current().id(),
"agent.list_interfaces()",
now.elapsed(),
);

let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
assert_eq!("first".to_string(), s.Interfaces[0].name);
assert_eq!("second".to_string(), s.Interfaces[1].name);
format!("{s:?}")
}
};

println!(
"OS Thread {:?} - {} -> {} ended: {:?}",
"OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
std::thread::current().id(),
"agent.list_interfaces()",
show,
now.elapsed(),
);
});

println!(
"Main OS Thread - {} started: {:?}",
"agent.online_cpu_mem()",
"Main OS Thread - agent.online_cpu_mem() started: {:?}",
now.elapsed()
);
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
Err(Error::RpcStatus(s)) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert! and assert_eq! can take additional parameters to act like format!

https://doc.rust-lang.org/std/macro.assert.html#custom-messages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out but since the macro doesn't return anything it isn't working. It seems the formatting is only used when failed. From the doc: Expressions used as format arguments will only be evaluated if the assertion fails.

assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!(
"/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
s.message()
);
format!("{s:?}")
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
panic!("not expecting a OK response from the example server: {:?}", s)
}
};
println!(
"Main OS Thread - {} -> {} ended: {:?}",
"agent.online_cpu_mem()",
"Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
show,
now.elapsed()
);

println!("\nsleep 2 seconds ...\n");
thread::sleep(std::time::Duration::from_secs(2));

let version = hc.version(default_ctx(), &health::CheckRequest::new());
assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
println!(
"Main OS Thread - {} started: {:?}",
"health.version()",
"Main OS Thread - health.version() started: {:?}",
now.elapsed()
);
println!(
"Main OS Thread - {} -> {:?} ended: {:?}",
"health.version()",
hc.version(default_ctx(), &health::CheckRequest::new()),
"Main OS Thread - health.version() -> {:?} ended: {:?}",
version,
now.elapsed()
);

Expand Down
5 changes: 3 additions & 2 deletions example/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
//

#[cfg(unix)]
pub mod asynchronous;
pub mod sync;
#[cfg(unix)]
pub use asynchronous as r#async;
pub mod sync;
Loading