Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): Make transport server and channel independent #1630

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ jobs:
with:
tool: protoc@${{ env.PROTOC_VERSION }}
- uses: Swatinem/rust-cache@v2
- run: cargo hack udeps --workspace --each-feature ${{ matrix.option }}
- run: cargo hack udeps --workspace --exclude-features tls --each-feature ${{ matrix.option }}
- run: cargo udeps --package tonic --features tls,transport
- run: cargo udeps --package tonic --features tls,server
- run: cargo udeps --package tonic --features tls,channel

check:
runs-on: ${{ matrix.os }}
Expand All @@ -81,6 +84,8 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: Check features
run: cargo hack check --workspace --no-private --each-feature --no-dev-deps
- name: Check tonic feature powerset
run: cargo hack check --package tonic --feature-powerset --depth 2
- name: Check all targets
run: cargo check --workspace --all-targets --all-features

Expand Down
18 changes: 10 additions & 8 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,31 @@ version = "0.11.0"
codegen = ["dep:async-trait"]
gzip = ["dep:flate2"]
zstd = ["dep:zstd"]
default = ["channel", "codegen", "prost"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"]
tls = ["dep:rustls-pemfile", "dep:tokio-rustls", "dep:tokio", "tokio?/rt", "tokio?/macros"]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls", "channel"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
router = ["dep:axum"]
transport = [
server = [
"router",
"dep:async-stream",
"dep:h2",
"dep:hyper", "dep:hyper-util",
"dep:hyper", "hyper?/server",
"dep:hyper-util", "hyper-util?/service", "hyper-util?/server-auto",
"dep:socket2",
"dep:tokio", "tokio?/macros", "tokio?/net", "tokio?/time",
"dep:tower", "tower?/util", "tower?/limit",
]
channel = [
"transport",
"dep:hyper", "hyper?/client",
"dep:hyper-util", "hyper-util?/client-legacy",
"dep:tower", "tower?/balance", "tower?/buffer", "tower?/discover", "tower?/load", "tower?/make",
"dep:tower", "tower?/balance", "tower?/buffer", "tower?/discover", "tower?/limit",
"dep:tokio", "tokio?/time",
"dep:hyper-timeout",
]
transport = ["server", "channel"]

# [[bench]]
# name = "bench_main"
Expand Down Expand Up @@ -76,8 +78,8 @@ async-trait = {version = "0.1.13", optional = true}
# transport
async-stream = {version = "0.3", optional = true}
h2 = {version = "0.4", optional = true}
hyper = {version = "1", features = ["http1", "http2", "server"], optional = true}
hyper-util = { version = ">=0.1.4, <0.2", features = ["service", "server-auto", "tokio"], optional = true }
hyper = {version = "1", features = ["http1", "http2"], optional = true}
hyper-util = { version = ">=0.1.4, <0.2", features = ["tokio"], optional = true }
socket2 = { version = ">=0.4.7, <0.6.0", optional = true, features = ["all"] }
tokio = {version = "1", default-features = false, optional = true}
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
9 changes: 5 additions & 4 deletions tonic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
//!
//! # Feature Flags
//!
//! - `transport`: Enables just the full featured server portion of the `channel` feature.
//! - `channel`: Enables the fully featured, batteries included client and server
//! - `transport`: Enables the fully featured, batteries included client and server
//! implementation based on [`hyper`], [`tower`] and [`tokio`]. Enabled by default.
//! - `server`: Enables just the full featured server portion of the `transport` feature.
//! - `channel`: Enables just the full featured channel portion of the `transport` feature.
//! - `codegen`: Enables all the required exports and optional dependencies required
//! for [`tonic-build`]. Enabled by default.
//! - `tls`: Enables the `rustls` based TLS options for the `transport` feature. Not
Expand Down Expand Up @@ -100,8 +101,8 @@ pub mod metadata;
pub mod server;
pub mod service;

#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(any(feature = "server", feature = "channel"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "server", feature = "channel"))))]
pub mod transport;

mod extensions;
Expand Down
22 changes: 11 additions & 11 deletions tonic/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use crate::metadata::{MetadataMap, MetadataValue};
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
use crate::transport::server::TcpConnectInfo;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use crate::transport::server::TlsConnectInfo;
use http::Extensions;
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
use std::net::SocketAddr;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "tls")]
#[cfg(all(feature = "server", feature = "tls"))]
use tokio_rustls::rustls::pki_types::CertificateDer;
use tokio_stream::Stream;

Expand Down Expand Up @@ -211,8 +211,8 @@ impl<T> Request<T> {
/// This will return `None` if the `IO` type used
/// does not implement `Connected` or when using a unix domain socket.
/// This currently only works on the server side.
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub fn local_addr(&self) -> Option<SocketAddr> {
let addr = self
.extensions()
Expand All @@ -234,8 +234,8 @@ impl<T> Request<T> {
/// This will return `None` if the `IO` type used
/// does not implement `Connected` or when using a unix domain socket.
/// This currently only works on the server side.
#[cfg(feature = "transport")]
#[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub fn remote_addr(&self) -> Option<SocketAddr> {
let addr = self
.extensions()
Expand All @@ -258,8 +258,8 @@ impl<T> Request<T> {
/// and is mostly used for mTLS. This currently only returns
/// `Some` on the server side of the `transport` server with
/// TLS enabled connections.
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[cfg(all(feature = "server", feature = "tls"))]
#[cfg_attr(docsrs, doc(all(feature = "server", feature = "tls")))]
pub fn peer_certs(&self) -> Option<Arc<Vec<CertificateDer<'static>>>> {
self.extensions()
.get::<TlsConnectInfo<TcpConnectInfo>>()
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct AxumBodyService<S> {
service: S,
}

pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

impl<S> Service<Request<axum::body::Body>> for AxumBodyService<S>
where
Expand Down
25 changes: 12 additions & 13 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ impl Status {
Status::new(Code::Unauthenticated, message)
}

#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub(crate) fn from_error_generic(
err: impl Into<Box<dyn Error + Send + Sync + 'static>>,
) -> Status {
Expand All @@ -316,7 +315,6 @@ impl Status {
///
/// Inspects the error source chain for recognizable errors, including statuses, HTTP2, and
/// hyper, and attempts to maps them to a `Status`, or else returns an Unknown `Status`.
#[cfg_attr(not(feature = "transport"), allow(dead_code))]
pub fn from_error(err: Box<dyn Error + Send + Sync + 'static>) -> Status {
Status::try_from_error(err).unwrap_or_else(|err| {
let mut status = Status::new(Code::Unknown, err.to_string());
Expand All @@ -342,7 +340,7 @@ impl Status {
Err(err) => err,
};

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
let err = match err.downcast::<h2::Error>() {
Ok(h2) => {
return Ok(Status::from_h2_error(h2));
Expand All @@ -359,7 +357,7 @@ impl Status {
}

// FIXME: bubble this into `transport` and expose generic http2 reasons.
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn from_h2_error(err: Box<h2::Error>) -> Status {
let code = Self::code_from_h2(&err);

Expand All @@ -368,7 +366,7 @@ impl Status {
status
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn code_from_h2(err: &h2::Error) -> Code {
// See https://github.com/grpc/grpc/blob/3977c30/doc/PROTOCOL-HTTP2.md#errors
match err.reason() {
Expand All @@ -388,7 +386,7 @@ impl Status {
}
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn to_h2_error(&self) -> h2::Error {
// conservatively transform to h2 error codes...
let reason = match self.code {
Expand All @@ -404,7 +402,7 @@ impl Status {
///
/// Returns Some if there's a way to handle the error, or None if the information from this
/// hyper error, but perhaps not its source, should be ignored.
#[cfg(feature = "transport")]
#[cfg(any(feature = "server", feature = "channel"))]
fn from_hyper_error(err: &hyper::Error) -> Option<Status> {
// is_timeout results from hyper's keep-alive logic
// (https://docs.rs/hyper/0.14.11/src/hyper/error.rs.html#192-194). Per the grpc spec
Expand All @@ -420,6 +418,7 @@ impl Status {
return Some(Status::cancelled(err.to_string()));
}

#[cfg(feature = "server")]
if let Some(h2_err) = err.source().and_then(|e| e.downcast_ref::<h2::Error>()) {
let code = Status::code_from_h2(h2_err);
let status = Self::new(code, format!("h2 protocol error: {}", err));
Expand Down Expand Up @@ -607,7 +606,7 @@ fn find_status_in_source_chain(err: &(dyn Error + 'static)) -> Option<Status> {
});
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
if let Some(timeout) = err.downcast_ref::<crate::transport::TimeoutExpired>() {
return Some(Status::cancelled(timeout.to_string()));
}
Expand All @@ -624,7 +623,7 @@ fn find_status_in_source_chain(err: &(dyn Error + 'static)) -> Option<Status> {
return Some(Status::unavailable(connect.to_string()));
}

#[cfg(feature = "transport")]
#[cfg(any(feature = "server", feature = "channel"))]
if let Some(hyper) = err
.downcast_ref::<hyper::Error>()
.and_then(Status::from_hyper_error)
Expand Down Expand Up @@ -671,14 +670,14 @@ fn invalid_header_value_byte<Error: fmt::Display>(err: Error) -> Status {
)
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
impl From<h2::Error> for Status {
fn from(err: h2::Error) -> Self {
Status::from_h2_error(Box::new(err))
}
}

#[cfg(feature = "transport")]
#[cfg(feature = "server")]
impl From<Status> for h2::Error {
fn from(status: Status) -> Self {
status.to_h2_error()
Expand Down Expand Up @@ -927,7 +926,7 @@ mod tests {
}

#[test]
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn from_error_h2() {
use std::error::Error as _;

Expand All @@ -944,7 +943,7 @@ mod tests {
}

#[test]
#[cfg(feature = "transport")]
#[cfg(feature = "server")]
fn to_h2_error() {
let orig = Status::new(Code::Cancelled, "stop eet!");
let err = orig.to_h2_error();
Expand Down
3 changes: 2 additions & 1 deletion tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tower::{
Service,
};

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>>;

const DEFAULT_BUFFER_SIZE: usize = 1024;
Expand Down Expand Up @@ -186,7 +187,7 @@ impl Channel {
D: Discover<Service = Connection> + Unpin + Send + 'static,
D::Error: Into<crate::Error>,
D::Key: Hash + Send + Clone,
E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,
E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
{
let svc = Balance::new(discover);

Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/add_origin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use http::uri::Authority;
use http::uri::Scheme;
use http::{Request, Uri};
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
use crate::{
body::{boxed, BoxBody},
transport::{service::GrpcTimeout, BoxFuture, Endpoint},
transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
};
use http::Uri;
use hyper::rt;
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/connector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::BoxedIo;
#[cfg(feature = "tls")]
use super::TlsConnector;
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use http::Uri;
use std::fmt;
use std::task::{Context, Poll};
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/transport/channel/service/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::transport::BoxFuture;
use crate::transport::channel::BoxFuture;
use std::{future::Future, sync::Arc};

pub(crate) use hyper::rt::Executor;
Expand Down
11 changes: 6 additions & 5 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@

#[cfg(feature = "channel")]
pub mod channel;
#[cfg(feature = "server")]
pub mod server;

mod error;
Expand All @@ -104,13 +105,16 @@ mod tls;
pub use self::channel::{Channel, Endpoint};
pub use self::error::Error;
#[doc(inline)]
#[cfg(feature = "server")]
pub use self::server::Server;
#[doc(inline)]
pub use self::service::grpc_timeout::TimeoutExpired;

#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Certificate;
#[cfg(feature = "server")]
#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter};
pub use hyper::{body::Body, Uri};
#[cfg(feature = "tls")]
Expand All @@ -119,12 +123,9 @@ pub use tokio_rustls::rustls::pki_types::CertificateDer;
#[cfg(all(feature = "channel", feature = "tls"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "channel", feature = "tls"))))]
pub use self::channel::ClientTlsConfig;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
#[cfg(all(feature = "server", feature = "tls"))]
#[cfg_attr(docsrs, doc(all(feature = "server", feature = "tls")))]
pub use self::server::ServerTlsConfig;
#[cfg(feature = "tls")]
#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
pub use self::tls::Identity;

#[cfg(feature = "channel")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how your commit message describes this change? It seems to remove this instance and makes another instance private, but this commit doesn't "move" anything in the sense of creating something new.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The definition of BoxFuture used in the implementation of the channel feature is moved to the channel module.

use crate::service::router::BoxFuture;
3 changes: 1 addition & 2 deletions tonic/src/transport/server/incoming.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::{Connected, Server};
use crate::transport::service::ServerIo;
use super::{service::ServerIo, Connected, Server};
use std::{
net::{SocketAddr, TcpListener as StdTcpListener},
pin::{pin, Pin},
Expand Down
Loading