From 4dda4cbcca88fa46a7d8a6e4eabfb6d7c333617a Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Tue, 18 May 2021 10:41:21 +0200 Subject: [PATCH] feat(tonic): Use `BoxBody` from `http-body` crate (#622) As of `http-body` 0.4.1 its has had a `BoxBody` type similar to `tonic::body::BoxBody`. It also has `Empty` and `Body::map_{data,err}`. That means all the custom body things we had in tonic can basically be replaced with that. Note that this is a breaking change so we should merge this next time we decide to ship a breaking release. The breaking changes are: - `tonic::body::Body` has been removed. I think its fine for users to depend directly on `http-body` if they need this trait. - `tonic::body::BoxBody` is now just a type alias for `http_body::combinators::BoxBody`. So the methods it previously had are gone. The replacements are - `tonic::body::Body::new` -> `http_body::Body::boxed` - `tonic::body::Body::map_from` -> `http_body::Body::map_data` and `http_body::Body::map_err` depending on which part you want to map. - `tonic::body::Body::empty` -> `http_body::Empty` Additionally a `Sync` bound has been added to a few methods. I actually don't think this is a breaking change because the old `tonic::body::Body` trait had `Sync` as a supertrait meaning the `Sync` requirement was already there. Fixes https://github.com/hyperium/tonic/issues/557 --- examples/Cargo.toml | 2 +- interop/Cargo.toml | 2 +- tonic-build/src/client.rs | 4 +- tonic-build/src/server.rs | 4 +- .../tests/proto/grpc.reflection.v1alpha.rs | 337 ------------------ tonic-reflection/tests/proto/mod.rs | 337 ------------------ tonic/Cargo.toml | 2 +- .../benchmarks/compiled_protos/helloworld.rs | 8 +- tonic/src/body.rs | 220 +----------- tonic/src/client/grpc.rs | 20 +- tonic/src/client/service.rs | 9 +- tonic/src/codec/decode.rs | 5 +- tonic/src/codegen.rs | 7 +- tonic/src/status.rs | 2 +- tonic/src/transport/server/mod.rs | 2 +- tonic/src/transport/server/recover_error.rs | 2 +- 16 files changed, 44 insertions(+), 919 deletions(-) delete mode 100644 tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs delete mode 100644 tonic-reflection/tests/proto/mod.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a706bbec5..083f21174 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -173,7 +173,7 @@ prost-types = "0.7" hyper = "0.14" warp = "0.3" http = "0.2" -http-body = "0.4" +http-body = "0.4.2" pin-project = "1.0" # Health example tonic-health = { path = "../tonic-health" } diff --git a/interop/Cargo.toml b/interop/Cargo.toml index 84bdb650c..bf2e1a892 100644 --- a/interop/Cargo.toml +++ b/interop/Cargo.toml @@ -26,7 +26,7 @@ http = "0.2" futures-core = "0.3" futures-util = "0.3" tower = { version = "0.4" } -http-body = "0.4" +http-body = "0.4.2" hyper = "0.14" console = "0.14" structopt = "0.3" diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index 5d1bd6717..e65248908 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -37,9 +37,9 @@ pub fn generate( impl #service_ident where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, - ::Error: Into + Send, { + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index 8cb4866cc..170c79995 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -69,7 +69,7 @@ pub fn generate( impl Service> for #server_service where T: #server_trait, - B: HttpBody + Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -91,7 +91,7 @@ pub fn generate( .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs b/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs deleted file mode 100644 index bdd4cba96..000000000 --- a/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs +++ /dev/null @@ -1,337 +0,0 @@ -/// The message sent by the client when calling ServerReflectionInfo method. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerReflectionRequest { - #[prost(string, tag = "1")] - pub host: ::prost::alloc::string::String, - /// To use reflection service, the client should set one of the following - /// fields in message_request. The server distinguishes requests by their - /// defined field and then handles them using corresponding methods. - #[prost( - oneof = "server_reflection_request::MessageRequest", - tags = "3, 4, 5, 6, 7" - )] - pub message_request: ::core::option::Option, -} -/// Nested message and enum types in `ServerReflectionRequest`. -pub mod server_reflection_request { - /// To use reflection service, the client should set one of the following - /// fields in message_request. The server distinguishes requests by their - /// defined field and then handles them using corresponding methods. - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum MessageRequest { - /// Find a proto file by the file name. - #[prost(string, tag = "3")] - FileByFilename(::prost::alloc::string::String), - /// Find the proto file that declares the given fully-qualified symbol name. - /// This field should be a fully-qualified symbol name - /// (e.g. .[.] or .). - #[prost(string, tag = "4")] - FileContainingSymbol(::prost::alloc::string::String), - /// Find the proto file which defines an extension extending the given - /// message type with the given field number. - #[prost(message, tag = "5")] - FileContainingExtension(super::ExtensionRequest), - /// Finds the tag numbers used by all known extensions of extendee_type, and - /// appends them to ExtensionNumberResponse in an undefined order. - /// Its corresponding method is best-effort: it's not guaranteed that the - /// reflection service will implement this method, and it's not guaranteed - /// that this method will provide all extensions. Returns - /// StatusCode::UNIMPLEMENTED if it's not implemented. - /// This field should be a fully-qualified type name. The format is - /// . - #[prost(string, tag = "6")] - AllExtensionNumbersOfType(::prost::alloc::string::String), - /// List the full names of registered services. The content will not be - /// checked. - #[prost(string, tag = "7")] - ListServices(::prost::alloc::string::String), - } -} -/// The type name and extension number sent by the client when requesting -/// file_containing_extension. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ExtensionRequest { - /// Fully-qualified type name. The format should be . - #[prost(string, tag = "1")] - pub containing_type: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub extension_number: i32, -} -/// The message sent by the server to answer ServerReflectionInfo method. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerReflectionResponse { - #[prost(string, tag = "1")] - pub valid_host: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] - pub original_request: ::core::option::Option, - /// The server sets one of the following fields according to the - /// message_request in the request. - #[prost( - oneof = "server_reflection_response::MessageResponse", - tags = "4, 5, 6, 7" - )] - pub message_response: ::core::option::Option, -} -/// Nested message and enum types in `ServerReflectionResponse`. -pub mod server_reflection_response { - /// The server sets one of the following fields according to the - /// message_request in the request. - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum MessageResponse { - /// This message is used to answer file_by_filename, file_containing_symbol, - /// file_containing_extension requests with transitive dependencies. - /// As the repeated label is not allowed in oneof fields, we use a - /// FileDescriptorResponse message to encapsulate the repeated fields. - /// The reflection service is allowed to avoid sending FileDescriptorProtos - /// that were previously sent in response to earlier requests in the stream. - #[prost(message, tag = "4")] - FileDescriptorResponse(super::FileDescriptorResponse), - /// This message is used to answer all_extension_numbers_of_type requests. - #[prost(message, tag = "5")] - AllExtensionNumbersResponse(super::ExtensionNumberResponse), - /// This message is used to answer list_services requests. - #[prost(message, tag = "6")] - ListServicesResponse(super::ListServiceResponse), - /// This message is used when an error occurs. - #[prost(message, tag = "7")] - ErrorResponse(super::ErrorResponse), - } -} -/// Serialized FileDescriptorProto messages sent by the server answering -/// a file_by_filename, file_containing_symbol, or file_containing_extension -/// request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FileDescriptorResponse { - /// Serialized FileDescriptorProto messages. We avoid taking a dependency on - /// descriptor.proto, which uses proto2 only features, by making them opaque - /// bytes instead. - #[prost(bytes = "vec", repeated, tag = "1")] - pub file_descriptor_proto: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, -} -/// A list of extension numbers sent by the server answering -/// all_extension_numbers_of_type request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ExtensionNumberResponse { - /// Full name of the base type, including the package name. The format - /// is . - #[prost(string, tag = "1")] - pub base_type_name: ::prost::alloc::string::String, - #[prost(int32, repeated, tag = "2")] - pub extension_number: ::prost::alloc::vec::Vec, -} -/// A list of ServiceResponse sent by the server answering list_services request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListServiceResponse { - /// The information of each service may be expanded in the future, so we use - /// ServiceResponse message to encapsulate it. - #[prost(message, repeated, tag = "1")] - pub service: ::prost::alloc::vec::Vec, -} -/// The information of a single service used by ListServiceResponse to answer -/// list_services request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServiceResponse { - /// Full name of a registered service, including its package name. The format - /// is . - #[prost(string, tag = "1")] - pub name: ::prost::alloc::string::String, -} -/// The error code and error message sent by the server when an error occurs. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ErrorResponse { - /// This field uses the error codes defined in grpc::StatusCode. - #[prost(int32, tag = "1")] - pub error_code: i32, - #[prost(string, tag = "2")] - pub error_message: ::prost::alloc::string::String, -} -#[doc = r" Generated client implementations."] -pub mod server_reflection_client { - #![allow(unused_variables, dead_code, missing_docs)] - use tonic::codegen::*; - pub struct ServerReflectionClient { - inner: tonic::client::Grpc, - } - impl ServerReflectionClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] - pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl ServerReflectionClient - where - T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - T::Error: Into, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_interceptor(inner: T, interceptor: impl Into) -> Self { - let inner = tonic::client::Grpc::with_interceptor(inner, interceptor); - Self { inner } - } - #[doc = " The reflection service is structured as a bidirectional stream, ensuring"] - #[doc = " all related requests go to a single server."] - pub async fn server_reflection_info( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - } - impl Clone for ServerReflectionClient { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } - } - impl std::fmt::Debug for ServerReflectionClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ServerReflectionClient {{ ... }}") - } - } -} -#[doc = r" Generated server implementations."] -pub mod server_reflection_server { - #![allow(unused_variables, dead_code, missing_docs)] - use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with ServerReflectionServer."] - #[async_trait] - pub trait ServerReflection: Send + Sync + 'static { - #[doc = "Server streaming response type for the ServerReflectionInfo method."] - type ServerReflectionInfoStream: Stream> - + Send - + Sync - + 'static; - #[doc = " The reflection service is structured as a bidirectional stream, ensuring"] - #[doc = " all related requests go to a single server."] - async fn server_reflection_info( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - } - #[derive(Debug)] - pub struct ServerReflectionServer { - inner: _Inner, - } - struct _Inner(Arc, Option); - impl ServerReflectionServer { - pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner, None); - Self { inner } - } - pub fn with_interceptor(inner: T, interceptor: impl Into) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner, Some(interceptor.into())); - Self { inner } - } - } - impl Service> for ServerReflectionServer - where - T: ServerReflection, - B: HttpBody + Send + Sync + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = Never; - type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" => { - #[allow(non_camel_case_types)] - struct ServerReflectionInfoSvc(pub Arc); - impl - tonic::server::StreamingService - for ServerReflectionInfoSvc - { - type Response = super::ServerReflectionResponse; - type ResponseStream = T::ServerReflectionInfoStream; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request< - tonic::Streaming, - >, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).server_reflection_info(request).await }; - Box::pin(fut) - } - } - let inner = self.inner.clone(); - let fut = async move { - let interceptor = inner.1; - let inner = inner.0; - let method = ServerReflectionInfoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) - .unwrap()) - }), - } - } - } - impl Clone for ServerReflectionServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { inner } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(self.0.clone(), self.1.clone()) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::transport::NamedService for ServerReflectionServer { - const NAME: &'static str = "grpc.reflection.v1alpha.ServerReflection"; - } -} diff --git a/tonic-reflection/tests/proto/mod.rs b/tonic-reflection/tests/proto/mod.rs deleted file mode 100644 index bdd4cba96..000000000 --- a/tonic-reflection/tests/proto/mod.rs +++ /dev/null @@ -1,337 +0,0 @@ -/// The message sent by the client when calling ServerReflectionInfo method. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerReflectionRequest { - #[prost(string, tag = "1")] - pub host: ::prost::alloc::string::String, - /// To use reflection service, the client should set one of the following - /// fields in message_request. The server distinguishes requests by their - /// defined field and then handles them using corresponding methods. - #[prost( - oneof = "server_reflection_request::MessageRequest", - tags = "3, 4, 5, 6, 7" - )] - pub message_request: ::core::option::Option, -} -/// Nested message and enum types in `ServerReflectionRequest`. -pub mod server_reflection_request { - /// To use reflection service, the client should set one of the following - /// fields in message_request. The server distinguishes requests by their - /// defined field and then handles them using corresponding methods. - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum MessageRequest { - /// Find a proto file by the file name. - #[prost(string, tag = "3")] - FileByFilename(::prost::alloc::string::String), - /// Find the proto file that declares the given fully-qualified symbol name. - /// This field should be a fully-qualified symbol name - /// (e.g. .[.] or .). - #[prost(string, tag = "4")] - FileContainingSymbol(::prost::alloc::string::String), - /// Find the proto file which defines an extension extending the given - /// message type with the given field number. - #[prost(message, tag = "5")] - FileContainingExtension(super::ExtensionRequest), - /// Finds the tag numbers used by all known extensions of extendee_type, and - /// appends them to ExtensionNumberResponse in an undefined order. - /// Its corresponding method is best-effort: it's not guaranteed that the - /// reflection service will implement this method, and it's not guaranteed - /// that this method will provide all extensions. Returns - /// StatusCode::UNIMPLEMENTED if it's not implemented. - /// This field should be a fully-qualified type name. The format is - /// . - #[prost(string, tag = "6")] - AllExtensionNumbersOfType(::prost::alloc::string::String), - /// List the full names of registered services. The content will not be - /// checked. - #[prost(string, tag = "7")] - ListServices(::prost::alloc::string::String), - } -} -/// The type name and extension number sent by the client when requesting -/// file_containing_extension. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ExtensionRequest { - /// Fully-qualified type name. The format should be . - #[prost(string, tag = "1")] - pub containing_type: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub extension_number: i32, -} -/// The message sent by the server to answer ServerReflectionInfo method. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServerReflectionResponse { - #[prost(string, tag = "1")] - pub valid_host: ::prost::alloc::string::String, - #[prost(message, optional, tag = "2")] - pub original_request: ::core::option::Option, - /// The server sets one of the following fields according to the - /// message_request in the request. - #[prost( - oneof = "server_reflection_response::MessageResponse", - tags = "4, 5, 6, 7" - )] - pub message_response: ::core::option::Option, -} -/// Nested message and enum types in `ServerReflectionResponse`. -pub mod server_reflection_response { - /// The server sets one of the following fields according to the - /// message_request in the request. - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum MessageResponse { - /// This message is used to answer file_by_filename, file_containing_symbol, - /// file_containing_extension requests with transitive dependencies. - /// As the repeated label is not allowed in oneof fields, we use a - /// FileDescriptorResponse message to encapsulate the repeated fields. - /// The reflection service is allowed to avoid sending FileDescriptorProtos - /// that were previously sent in response to earlier requests in the stream. - #[prost(message, tag = "4")] - FileDescriptorResponse(super::FileDescriptorResponse), - /// This message is used to answer all_extension_numbers_of_type requests. - #[prost(message, tag = "5")] - AllExtensionNumbersResponse(super::ExtensionNumberResponse), - /// This message is used to answer list_services requests. - #[prost(message, tag = "6")] - ListServicesResponse(super::ListServiceResponse), - /// This message is used when an error occurs. - #[prost(message, tag = "7")] - ErrorResponse(super::ErrorResponse), - } -} -/// Serialized FileDescriptorProto messages sent by the server answering -/// a file_by_filename, file_containing_symbol, or file_containing_extension -/// request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FileDescriptorResponse { - /// Serialized FileDescriptorProto messages. We avoid taking a dependency on - /// descriptor.proto, which uses proto2 only features, by making them opaque - /// bytes instead. - #[prost(bytes = "vec", repeated, tag = "1")] - pub file_descriptor_proto: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, -} -/// A list of extension numbers sent by the server answering -/// all_extension_numbers_of_type request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ExtensionNumberResponse { - /// Full name of the base type, including the package name. The format - /// is . - #[prost(string, tag = "1")] - pub base_type_name: ::prost::alloc::string::String, - #[prost(int32, repeated, tag = "2")] - pub extension_number: ::prost::alloc::vec::Vec, -} -/// A list of ServiceResponse sent by the server answering list_services request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListServiceResponse { - /// The information of each service may be expanded in the future, so we use - /// ServiceResponse message to encapsulate it. - #[prost(message, repeated, tag = "1")] - pub service: ::prost::alloc::vec::Vec, -} -/// The information of a single service used by ListServiceResponse to answer -/// list_services request. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ServiceResponse { - /// Full name of a registered service, including its package name. The format - /// is . - #[prost(string, tag = "1")] - pub name: ::prost::alloc::string::String, -} -/// The error code and error message sent by the server when an error occurs. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ErrorResponse { - /// This field uses the error codes defined in grpc::StatusCode. - #[prost(int32, tag = "1")] - pub error_code: i32, - #[prost(string, tag = "2")] - pub error_message: ::prost::alloc::string::String, -} -#[doc = r" Generated client implementations."] -pub mod server_reflection_client { - #![allow(unused_variables, dead_code, missing_docs)] - use tonic::codegen::*; - pub struct ServerReflectionClient { - inner: tonic::client::Grpc, - } - impl ServerReflectionClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] - pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl ServerReflectionClient - where - T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - T::Error: Into, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_interceptor(inner: T, interceptor: impl Into) -> Self { - let inner = tonic::client::Grpc::with_interceptor(inner, interceptor); - Self { inner } - } - #[doc = " The reflection service is structured as a bidirectional stream, ensuring"] - #[doc = " all related requests go to a single server."] - pub async fn server_reflection_info( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - } - impl Clone for ServerReflectionClient { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } - } - impl std::fmt::Debug for ServerReflectionClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "ServerReflectionClient {{ ... }}") - } - } -} -#[doc = r" Generated server implementations."] -pub mod server_reflection_server { - #![allow(unused_variables, dead_code, missing_docs)] - use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with ServerReflectionServer."] - #[async_trait] - pub trait ServerReflection: Send + Sync + 'static { - #[doc = "Server streaming response type for the ServerReflectionInfo method."] - type ServerReflectionInfoStream: Stream> - + Send - + Sync - + 'static; - #[doc = " The reflection service is structured as a bidirectional stream, ensuring"] - #[doc = " all related requests go to a single server."] - async fn server_reflection_info( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - } - #[derive(Debug)] - pub struct ServerReflectionServer { - inner: _Inner, - } - struct _Inner(Arc, Option); - impl ServerReflectionServer { - pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner, None); - Self { inner } - } - pub fn with_interceptor(inner: T, interceptor: impl Into) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner, Some(interceptor.into())); - Self { inner } - } - } - impl Service> for ServerReflectionServer - where - T: ServerReflection, - B: HttpBody + Send + Sync + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = Never; - type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo" => { - #[allow(non_camel_case_types)] - struct ServerReflectionInfoSvc(pub Arc); - impl - tonic::server::StreamingService - for ServerReflectionInfoSvc - { - type Response = super::ServerReflectionResponse; - type ResponseStream = T::ServerReflectionInfoStream; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request< - tonic::Streaming, - >, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).server_reflection_info(request).await }; - Box::pin(fut) - } - } - let inner = self.inner.clone(); - let fut = async move { - let interceptor = inner.1; - let inner = inner.0; - let method = ServerReflectionInfoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = if let Some(interceptor) = interceptor { - tonic::server::Grpc::with_interceptor(codec, interceptor) - } else { - tonic::server::Grpc::new(codec) - }; - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) - .unwrap()) - }), - } - } - } - impl Clone for ServerReflectionServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { inner } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(self.0.clone(), self.1.clone()) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::transport::NamedService for ServerReflectionServer { - const NAME: &'static str = "grpc.reflection.v1alpha.ServerReflection"; - } -} diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 4c5e535da..eb023a959 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -54,7 +54,7 @@ percent-encoding = "2.1" tower-service = "0.3" tokio-util = { version = "0.6", features = ["codec"] } async-stream = "0.3" -http-body = "0.4" +http-body = "0.4.2" pin-project = "1.0" # prost diff --git a/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs b/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs index 79afc2b6f..dcfce39d5 100755 --- a/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs +++ b/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs @@ -32,10 +32,10 @@ pub mod client { impl GreeterClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + 'static, T::Error: Into, - ::Error: Into + Send, - ::Data: Into + Send, + ::Error: Into + Send, + ::Data: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -158,7 +158,7 @@ pub mod server { Ok(http::Response::builder() .status(200) .header("grpc-status", "12") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 7753086ac..9f5f35b2d 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,218 +1,12 @@ //! HTTP specific body utilities. -//! -//! This module contains traits and helper types to work with http bodies. Most -//! of the types in this module are based around [`http_body::Body`]. -use crate::{Error, Status}; -use bytes::{Buf, Bytes}; -use http_body::Body as HttpBody; -use std::{ - fmt, - pin::Pin, - task::{Context, Poll}, -}; +use http_body::Body; -/// A trait alias for [`http_body::Body`]. -pub trait Body: sealed::Sealed + Send + Sync { - /// The body data type. - type Data: Buf; - /// The errors produced from the body. - type Error: Into; +/// A type erased HTTP body used for tonic services. +pub type BoxBody = http_body::combinators::BoxBody; - /// Check if the stream is over or not. - /// - /// Reference [`http_body::Body::is_end_stream`]. - fn is_end_stream(&self) -> bool; - - /// Poll for more data from the body. - /// - /// Reference [`http_body::Body::poll_data`]. - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>>; - - /// Poll for the trailing headers. - /// - /// Reference [`http_body::Body::poll_trailers`]. - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; -} - -impl Body for T -where - T: HttpBody + Send + Sync + 'static, - T::Error: Into, -{ - type Data = T::Data; - type Error = T::Error; - - fn is_end_stream(&self) -> bool { - HttpBody::is_end_stream(self) - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - HttpBody::poll_data(self, cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - HttpBody::poll_trailers(self, cx) - } -} - -impl sealed::Sealed for T -where - T: HttpBody, - T::Error: Into, -{ -} - -mod sealed { - pub trait Sealed {} -} - -/// A type erased http body. -pub struct BoxBody { - inner: Pin + Send + Sync + 'static>>, -} - -struct MapBody(B); - -impl BoxBody { - /// Create a new `BoxBody` mapping item and error to the default types. - pub fn new(inner: B) -> Self - where - B: Body + Send + Sync + 'static, - { - BoxBody { - inner: Box::pin(inner), - } - } - - /// Create a new `BoxBody` mapping item and error to the default types. - pub fn map_from(inner: B) -> Self - where - B: Body + Send + Sync + 'static, - B::Error: Into, - { - BoxBody { - inner: Box::pin(MapBody(inner)), - } - } - - /// Create a new `BoxBody` that is empty. - pub fn empty() -> Self { - BoxBody { - inner: Box::pin(EmptyBody::default()), - } - } -} - -impl HttpBody for BoxBody { - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - Body::poll_data(self.inner.as_mut(), cx) - } - - fn poll_trailers( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Body::poll_trailers(self.inner.as_mut(), cx) - } -} - -impl HttpBody for MapBody -where - B: Body, - B::Error: Into, -{ - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - self.0.is_end_stream() - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let v = unsafe { - let me = self.get_unchecked_mut(); - Pin::new_unchecked(&mut me.0).poll_data(cx) - }; - match futures_util::ready!(v) { - Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.copy_to_bytes(i.remaining())))), - Some(Err(e)) => { - let err = Status::map_error(e.into()); - Poll::Ready(Some(Err(err))) - } - None => Poll::Ready(None), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let v = unsafe { - let me = self.get_unchecked_mut(); - Pin::new_unchecked(&mut me.0).poll_trailers(cx) - }; - - let v = futures_util::ready!(v).map_err(|e| Status::from_error(&*e.into())); - Poll::Ready(v) - } -} - -impl fmt::Debug for BoxBody { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BoxBody").finish() - } -} - -#[derive(Debug, Default)] -struct EmptyBody { - _p: (), -} - -impl HttpBody for EmptyBody { - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - true - } - - fn poll_data( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(None) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } +// this also exists in `crate::codegen` but we need it here since `codegen` has +// `#[cfg(feature = "codegen")]`. +pub(crate) fn empty_body() -> BoxBody { + http_body::Empty::new().map_err(|err| match err {}).boxed() } diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 4142be8d0..a249fb757 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -1,5 +1,5 @@ use crate::{ - body::{Body, BoxBody}, + body::BoxBody, client::GrpcService, codec::{encode_client, Codec, Streaming}, interceptor::Interceptor, @@ -11,7 +11,7 @@ use http::{ header::{HeaderValue, CONTENT_TYPE, TE}, uri::{Parts, PathAndQuery, Uri}, }; -use http_body::Body as HttpBody; +use http_body::Body; use std::fmt; /// A gRPC client dispatcher. @@ -71,8 +71,8 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -90,8 +90,8 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, S: Stream + Send + Sync + 'static, C: Codec, M1: Send + Sync + 'static, @@ -127,8 +127,8 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -146,8 +146,8 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, S: Stream + Send + Sync + 'static, C: Codec, M1: Send + Sync + 'static, diff --git a/tonic/src/client/service.rs b/tonic/src/client/service.rs index c7511ed4a..8f300c366 100644 --- a/tonic/src/client/service.rs +++ b/tonic/src/client/service.rs @@ -1,5 +1,4 @@ -use crate::body::Body; -use http_body::Body as HttpBody; +use http_body::Body; use std::future::Future; use std::task::{Context, Poll}; use tower_service::Service; @@ -13,7 +12,7 @@ use tower_service::Service; /// [`tower_service`]: https://docs.rs/tower-service pub trait GrpcService { /// Responses body given by the service. - type ResponseBody: Body + HttpBody; + type ResponseBody: Body; /// Errors produced by the service. type Error: Into; /// The future response value. @@ -34,8 +33,8 @@ impl GrpcService for T where T: Service, Response = http::Response>, T::Error: Into, - ResBody: Body + HttpBody, - ::Error: Into, + ResBody: Body, + ::Error: Into, { type ResponseBody = ResBody; type Error = T::Error; diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index dbebfe21e..73f3427af 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -79,7 +79,10 @@ impl Streaming { { Self { decoder: Box::new(decoder), - body: BoxBody::map_from(body), + body: body + .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + .map_err(|err| Status::map_error(err.into())) + .boxed(), state: State::ReadHeader, direction, buf: BytesMut::with_capacity(BUFFER_SIZE), diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index cb146929c..5cb4221b4 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -4,14 +4,13 @@ pub use async_trait::async_trait; pub use futures_core; pub use futures_util::future::{ok, poll_fn, Ready}; -pub use http_body::Body as HttpBody; pub use std::future::Future; pub use std::pin::Pin; pub use std::sync::Arc; pub use std::task::{Context, Poll}; pub use tower_service::Service; pub type StdError = Box; -pub use crate::body::Body; +pub use http_body::Body; pub type BoxFuture = self::Pin> + Send + 'static>>; pub type BoxStream = @@ -31,3 +30,7 @@ impl std::fmt::Display for Never { } impl std::error::Error for Never {} + +pub fn empty_body() -> crate::body::BoxBody { + http_body::Empty::new().map_err(|err| match err {}).boxed() +} diff --git a/tonic/src/status.rs b/tonic/src/status.rs index 599ad6b9e..674be2c56 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -520,7 +520,7 @@ impl Status { self.add_header(&mut parts.headers).unwrap(); - http::Response::from_parts(parts, BoxBody::empty()) + http::Response::from_parts(parts, crate::body::empty_body()) } } diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index 7417532ac..e4beece0f 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -712,7 +712,7 @@ impl Service> for Unimplemented { .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(BoxBody::empty()) + .body(crate::body::empty_body()) .unwrap(), ) } diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 9b4ff2e67..2004560c2 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -63,7 +63,7 @@ where Ok(res) => Poll::Ready(Ok(res)), Err(err) => { if let Some(status) = Status::try_from_error(&*err) { - let mut res = Response::new(BoxBody::empty()); + let mut res = Response::new(crate::body::empty_body()); status.add_header(res.headers_mut()).unwrap(); Poll::Ready(Ok(res)) } else {