Skip to content

Commit

Permalink
Merge pull request #172 from jsturtevant/windows3
Browse files Browse the repository at this point in the history
Windows Support for Sync implementation
  • Loading branch information
lifupan authored Mar 27, 2023
2 parents 78de1c5 + 4975099 commit ebbb58e
Show file tree
Hide file tree
Showing 28 changed files with 1,069 additions and 298 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand All @@ -22,7 +22,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ nix = "0.23.0"
log = "0.4"
byteorder = "1.3.2"
thiserror = "1.0"

async-trait = { version = "0.1.31", optional = true }
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
futures = { version = "0.3", optional = true }

[target.'cfg(windows)'.dependencies]
windows-sys = {version = "0.45", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
tokio-vsock = { version = "0.3.1", optional = true }

[build-dependencies]
protobuf-codegen = "3.1.0"

[dev-dependencies]
assert_cmd = "2.0.7"

[features]
default = ["sync"]
async = ["async-trait", "tokio", "futures", "tokio-vsock"]
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ build: debug

.PHONY: test
test:
ifeq ($OS,Windows_NT)
# async isn't enabled for windows, don't test that feature
cargo test --verbose
else
cargo test --all-features --verbose

endif

.PHONY: check
check:
cargo fmt --all -- --check
Expand Down
9 changes: 8 additions & 1 deletion example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let c = Client::connect(utils::SOCK_ADDR).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ use std::sync::Arc;

use log::LevelFilter;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct HealthService;

#[cfg(unix)]
#[async_trait]
impl health_ttrpc::Health for HealthService {
async fn check(
Expand Down Expand Up @@ -58,7 +63,7 @@ impl health_ttrpc::Health for HealthService {
}

struct AgentService;

#[cfg(unix)]
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
async fn list_interfaces(
Expand All @@ -82,6 +87,12 @@ impl agent_ttrpc::AgentService for AgentService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);
Expand Down
15 changes: 14 additions & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(log::LevelFilter::Info);
Expand Down Expand Up @@ -48,6 +55,7 @@ fn default_ctx() -> Context {
ctx
}

#[cfg(unix)]
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
let echo1 = streaming::EchoPayload {
seq: 1,
Expand All @@ -59,6 +67,7 @@ async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(resp.seq, echo1.seq + 1);
}

#[cfg(unix)]
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

Expand All @@ -81,6 +90,7 @@ async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
}

#[cfg(unix)]
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();

Expand Down Expand Up @@ -108,6 +118,7 @@ async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(ssum.num, sum.num);
}

#[cfg(unix)]
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
let expected = streaming::Sum {
sum: 392,
Expand All @@ -127,6 +138,7 @@ async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(actual.num, expected.num);
}

#[cfg(unix)]
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_null(default_ctx()).await.unwrap();

Expand All @@ -142,6 +154,7 @@ async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(res, empty::Empty::new());
}

#[cfg(unix)]
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();

Expand Down
11 changes: 11 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ use std::sync::Arc;

use log::{info, LevelFilter};

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
#[cfg(unix)]
use ttrpc::asynchronous::Server;

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct StreamingService;

#[cfg(unix)]
#[async_trait]
impl streaming_ttrpc::Streaming for StreamingService {
async fn echo(
Expand Down Expand Up @@ -131,6 +136,12 @@ impl streaming_ttrpc::Streaming for StreamingService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Info);
Expand Down
4 changes: 2 additions & 2 deletions example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {
async_all: true,
..Default::default()
})
.rust_protobuf_customize(protobuf_customized.clone())
.rust_protobuf_customize(protobuf_customized)
.run()
.expect("Gen async code failed.");

Expand Down Expand Up @@ -75,7 +75,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std

let new_contents = contents.replace(from, to);

let mut dst = File::create(&file_name)?;
let mut dst = File::create(file_name)?;
dst.write(new_contents.as_bytes())?;

Ok(())
Expand Down
77 changes: 55 additions & 22 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
mod protocols;
mod utils;

use log::LevelFilter;
use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
use std::thread;
use ttrpc::context::{self, Context};
use ttrpc::error::Error;
use ttrpc::proto::Code;
use ttrpc::Client;

fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);

let c = Client::connect(utils::SOCK_ADDR).unwrap();
let hc = health_ttrpc::HealthClient::new(c.clone());
let ac = agent_ttrpc::AgentServiceClient::new(c);
Expand All @@ -33,69 +38,97 @@ fn main() {
let t = thread::spawn(move || {
let req = health::CheckRequest::new();
println!(
"OS Thread {:?} - {} started: {:?}",
"OS Thread {:?} - health.check() started: {:?}",
std::thread::current().id(),
"health.check()",
now.elapsed(),
);

let rsp = thc.check(default_ctx(), &req);
match rsp.as_ref() {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!("Just for fun".to_string(), s.message())
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(x) => {
panic!("not expecting a OK response from the example server: {:?}", x)
}
}
println!(
"OS Thread {:?} - {} -> {:?} ended: {:?}",
"OS Thread {:?} - health.check() -> {:?} ended: {:?}",
std::thread::current().id(),
"health.check()",
thc.check(default_ctx(), &req),
rsp,
now.elapsed(),
);
});

let t2 = thread::spawn(move || {
println!(
"OS Thread {:?} - {} started: {:?}",
"OS Thread {:?} - agent.list_interfaces() started: {:?}",
std::thread::current().id(),
"agent.list_interfaces()",
now.elapsed(),
);

let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
assert_eq!("first".to_string(), s.Interfaces[0].name);
assert_eq!("second".to_string(), s.Interfaces[1].name);
format!("{s:?}")
}
};

println!(
"OS Thread {:?} - {} -> {} ended: {:?}",
"OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
std::thread::current().id(),
"agent.list_interfaces()",
show,
now.elapsed(),
);
});

println!(
"Main OS Thread - {} started: {:?}",
"agent.online_cpu_mem()",
"Main OS Thread - agent.online_cpu_mem() started: {:?}",
now.elapsed()
);
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!(
"/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
s.message()
);
format!("{s:?}")
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
panic!("not expecting a OK response from the example server: {:?}", s)
}
};
println!(
"Main OS Thread - {} -> {} ended: {:?}",
"agent.online_cpu_mem()",
"Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
show,
now.elapsed()
);

println!("\nsleep 2 seconds ...\n");
thread::sleep(std::time::Duration::from_secs(2));

let version = hc.version(default_ctx(), &health::CheckRequest::new());
assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
println!(
"Main OS Thread - {} started: {:?}",
"health.version()",
"Main OS Thread - health.version() started: {:?}",
now.elapsed()
);
println!(
"Main OS Thread - {} -> {:?} ended: {:?}",
"health.version()",
hc.version(default_ctx(), &health::CheckRequest::new()),
"Main OS Thread - health.version() -> {:?} ended: {:?}",
version,
now.elapsed()
);

Expand Down
5 changes: 3 additions & 2 deletions example/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
//

#[cfg(unix)]
pub mod asynchronous;
pub mod sync;
#[cfg(unix)]
pub use asynchronous as r#async;
pub mod sync;
Loading

0 comments on commit ebbb58e

Please sign in to comment.