Skip to content

Commit

Permalink
opt: multiplex pool
Browse files Browse the repository at this point in the history
  • Loading branch information
bobozhengsir committed Jan 8, 2024
1 parent 6a3aaad commit 4f1f762
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 127 deletions.
18 changes: 15 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ path = "src/multiplex/grpc_server.rs"
[[bin]]
name = "multiplex-grpc-client"
path = "src/multiplex/grpc_client.rs"
[[bin]]
name = "multiplex-thrift-server"
path = "src/multiplex/thrift_server.rs"
[[bin]]
name = "multiplex-thrift-client"
path = "src/multiplex/thrift_client.rs"

# streaming
[[bin]]
Expand Down Expand Up @@ -94,15 +100,21 @@ tracing-subscriber.workspace = true
pilota.workspace = true
volo = { path = "../volo" }
volo-grpc = { path = "../volo-grpc" }
volo-thrift = { path = "../volo-thrift" }
volo-thrift = { path = "../volo-thrift", features = ["multiplex"] }
volo-http = { path = "../volo-http", features = ["full"] }

volo-gen = { path = "./volo-gen" }

# TLS dependencies
librustls = { workspace = true, optional = true}
librustls = { workspace = true, optional = true }
rustls-pemfile = { workspace = true, optional = true }
tokio-rustls = { workspace = true, optional = true }

[features]
tls = ["librustls", "rustls-pemfile", "tokio-rustls", "volo/rustls", "volo-grpc/rustls"]
tls = [
"librustls",
"rustls-pemfile",
"tokio-rustls",
"volo/rustls",
"volo-grpc/rustls",
]
49 changes: 49 additions & 0 deletions examples/src/multiplex/thrift_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::{future, net::SocketAddr};

use lazy_static::lazy_static;
use volo_thrift::client::CallOpt;

lazy_static! {
static ref CLIENT: volo_gen::thrift_gen::hello::HelloServiceClient = {
let addr: SocketAddr = "127.0.0.1:8081".parse().unwrap();
volo_gen::thrift_gen::hello::HelloServiceClientBuilder::new("hello")
.address(addr)
.multiplex(true)
.build()
};
}

pub struct LogService<S>(S);

#[volo::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let futs = |i| async move {
let req = volo_gen::thrift_gen::hello::HelloRequest {
name: format!("volo{}", i).into(),
};
let resp = CLIENT
.clone()
.with_callopt(CallOpt::default())
.hello(req)
.await;
match resp {
Ok(info) => println!("{info:?}"),
Err(e) => eprintln!("{e:?}"),
}
};

let mut resps = Vec::with_capacity(10);
for i in 0..resps.capacity() {
resps.push(tokio::spawn(futs(i)));
}

for resp in resps {
let _ = resp.await;
}
future::pending::<()>().await;
}
32 changes: 32 additions & 0 deletions examples/src/multiplex/thrift_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::net::SocketAddr;

pub struct S;

impl volo_gen::thrift_gen::hello::HelloService for S {
async fn hello(
&self,
req: volo_gen::thrift_gen::hello::HelloRequest,
) -> Result<volo_gen::thrift_gen::hello::HelloResponse, volo_thrift::AnyhowError> {
let resp = volo_gen::thrift_gen::hello::HelloResponse {
message: format!("Hello, {}!", req.name).into(),
};
Ok(resp)
}
}

#[volo::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let addr: SocketAddr = "[::]:8081".parse().unwrap();
let addr = volo::net::Address::from(addr);

volo_gen::thrift_gen::hello::HelloServiceServer::new(S)
.multiplex(true)
.run(addr)
.await
.unwrap();
}
2 changes: 1 addition & 1 deletion examples/volo-gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ volo-grpc = { path = "../../volo-grpc" }
volo-thrift = { path = "../../volo-thrift" }

[build-dependencies]
volo-build = { path = "../../volo-build"}
volo-build = { path = "../../volo-build" }
4 changes: 2 additions & 2 deletions volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
protocol::TMessageType,
transport::{
multiplex::thrift_transport::ThriftTransport,
pool::{Config, PooledMakeTransport},
pool::{Config, PooledMakeTransport, Ver},
},
EntryMessage, Error, ThriftMessage,
};
Expand Down Expand Up @@ -137,7 +137,7 @@ where
})?;
let oneway = cx.message_type == TMessageType::OneWay;
cx.stats.record_make_transport_start_at();
let transport = self.make_transport.call(target).await?;
let transport = self.make_transport.call((target, Ver::Multiplex)).await?;
cx.stats.record_make_transport_end_at();
let resp = transport.send(cx, req, oneway).await;
if let Ok(None) = resp {
Expand Down
4 changes: 2 additions & 2 deletions volo-thrift/src/transport/pingpong/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
protocol::TMessageType,
transport::{
pingpong::thrift_transport::ThriftTransport,
pool::{Config, PooledMakeTransport},
pool::{Config, PooledMakeTransport, Ver},
},
EntryMessage, ThriftMessage,
};
Expand Down Expand Up @@ -120,7 +120,7 @@ where
})?;
let oneway = cx.message_type == TMessageType::OneWay;
cx.stats.record_make_transport_start_at();
let mut transport = self.make_transport.call(target).await?;
let mut transport = self.make_transport.call((target, Ver::PingPong)).await?;
cx.stats.record_make_transport_end_at();
let resp = transport.send(cx, req, oneway).await;
if let Ok(None) = resp {
Expand Down
34 changes: 16 additions & 18 deletions volo-thrift/src/transport/pool/make_transport.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
//! MakeTransport with pool
use std::{fmt::Debug, hash::Hash};

use motore::service::UnaryService;

use super::{Pool, Poolable, Pooled};
use super::{Key, Pool, Poolable, Pooled, Ver};

// pooled make transport wrap the inner MakeTransport and return the pooled transport
// when call make_transport
pub struct PooledMakeTransport<MT, Key>
pub struct PooledMakeTransport<MT, K: Key>
where
MT: UnaryService<Key>,
MT: UnaryService<K>,
<MT as UnaryService<K>>::Response: Poolable,
{
pub(crate) inner: MT,
pub(crate) pool: Pool<Key, MT::Response>,
pub(crate) pool: Pool<K, MT::Response>,
}

impl<MT, Key> Clone for PooledMakeTransport<MT, Key>
impl<MT, K: Key> Clone for PooledMakeTransport<MT, K>
where
MT: Clone,
MT: UnaryService<Key>,
MT: UnaryService<K>,
<MT as UnaryService<K>>::Response: Poolable,
{
fn clone(&self) -> Self {
PooledMakeTransport {
Expand All @@ -29,11 +29,10 @@ where
}
}

impl<MT, Key> PooledMakeTransport<MT, Key>
impl<MT, K: Key> PooledMakeTransport<MT, K>
where
MT: UnaryService<Key>,
MT: UnaryService<K>,
MT::Response: Poolable + Send + 'static,
Key: Clone + Eq + Hash + Debug + Send + 'static,
{
pub fn new(inner: MT, cfg: Option<super::Config>) -> Self {
Self {
Expand All @@ -43,19 +42,18 @@ where
}
}

impl<MT, Key> UnaryService<Key> for PooledMakeTransport<MT, Key>
impl<MT, K: Key> UnaryService<(K, Ver)> for PooledMakeTransport<MT, K>
where
Key: Clone + Eq + Hash + Debug + Send + 'static,
MT: UnaryService<Key> + Send + Clone + 'static + Sync,
MT: UnaryService<K> + Send + Clone + 'static + Sync,
MT::Response: Poolable + Send,
MT::Error: Into<crate::Error>,
MT::Error: Into<crate::Error> + Send,
{
type Response = Pooled<Key, MT::Response>;
type Response = Pooled<K, MT::Response>;

type Error = crate::Error;

async fn call(&self, key: Key) -> Result<Self::Response, Self::Error> {
async fn call(&self, kv: (K, Ver)) -> Result<Self::Response, Self::Error> {
let mt = self.inner.clone();
self.pool.get(key, mt).await.map_err(Into::into)
self.pool.get(kv.0, kv.1, mt).await.map_err(Into::into)
}
}
Loading

0 comments on commit 4f1f762

Please sign in to comment.