From c2871980d28efc6c57befd18262d76c5327cd693 Mon Sep 17 00:00:00 2001 From: Nicolas Belouin Date: Tue, 17 Oct 2023 11:00:22 +0200 Subject: [PATCH] Update tonic and prost Signed-off-by: Nicolas Belouin --- .github/workflows/check-rust.yml | 3 + .github/workflows/run-tarpaulin.yml | 4 +- Cargo.lock | 191 ++++--- agent/Cargo.toml | 6 +- agent/src/util/registration.rs | 2 +- agent/src/util/v1beta1.rs | 481 +++++++++++++----- discovery-handlers/debug-echo/Cargo.toml | 2 +- discovery-handlers/onvif/Cargo.toml | 2 +- discovery-handlers/opcua/Cargo.toml | 2 +- discovery-handlers/udev/Cargo.toml | 2 +- discovery-utils/Cargo.toml | 6 +- discovery-utils/src/discovery/v0.rs | 332 +++++++++--- samples/brokers/udev-video-broker/Cargo.toml | 6 +- .../udev-video-broker/src/util/camera.rs | 140 +++-- shared/Cargo.toml | 2 +- 15 files changed, 838 insertions(+), 343 deletions(-) diff --git a/.github/workflows/check-rust.yml b/.github/workflows/check-rust.yml index 89e585a55..0c9c3879f 100644 --- a/.github/workflows/check-rust.yml +++ b/.github/workflows/check-rust.yml @@ -36,11 +36,14 @@ jobs: toolchain: 1.73.0 components: clippy, rustfmt - name: Install Linux requirements + # TODO: When ubuntu-latest gets updated to >= 23.04 replace the wget+unzip with just protobuf-compiler in apt run: | apt_dependencies="git curl libssl-dev pkg-config libudev-dev libv4l-dev" echo "Run apt update and apt install the following dependencies: $apt_dependencies" sudo apt update sudo apt install -y $apt_dependencies + echo "Download and install recent enough protobuf compiler" + wget https://github.com/protocolbuffers/protobuf/releases/download/v24.3/protoc-24.3-linux-x86_64.zip && sudo unzip protoc-24.3-linux-x86_64.zip bin/protoc -d /usr - name: Check rust format run: cargo fmt --all -- --check - name: Check clippy diff --git a/.github/workflows/run-tarpaulin.yml b/.github/workflows/run-tarpaulin.yml index 66f433bce..6d902bd47 100644 --- a/.github/workflows/run-tarpaulin.yml +++ b/.github/workflows/run-tarpaulin.yml @@ -37,11 +37,11 @@ jobs: persist-credentials: false - name: Create tarpaulin instance - run: docker create --network host --security-opt seccomp=unconfined -v "${PWD}:/volume" xd009642/tarpaulin:0.25.1 bash -c "echo 'sleep 600m; echo bye' > /tmp/keep_alive.sh; chmod 777 /tmp/keep_alive.sh; /tmp/keep_alive.sh" > container_id.txt + run: docker create --network host --security-opt seccomp=unconfined -v "${PWD}:/volume" xd009642/tarpaulin:0.27.1 bash -c "echo 'sleep 600m; echo bye' > /tmp/keep_alive.sh; chmod 777 /tmp/keep_alive.sh; /tmp/keep_alive.sh" > container_id.txt - name: Start tarpaulin instance run: docker start $(cat container_id.txt) - name: Install linux requirement in tarpaulin instance - run: docker exec $(cat container_id.txt) sh -c "echo Run apt update and apt install the following dependencies - git curl libssl-dev pkg-config libudev-dev libv4l-dev ; apt update ; apt install -y git curl libssl-dev pkg-config libudev-dev libv4l-dev" + run: docker exec $(cat container_id.txt) sh -c "echo Run apt update and apt install the following dependencies - git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler ; apt update ; apt install -y git curl libssl-dev pkg-config libudev-dev libv4l-dev protobuf-compiler" - name: Install desired rust version run: docker exec $(cat container_id.txt) sh -c "rustup install $CARGO_VERSION" - name: Tell cargo to use desired rust version diff --git a/Cargo.lock b/Cargo.lock index e16f26d6f..67ece33d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,7 +134,7 @@ dependencies = [ "impl-more", "openssl", "pin-project-lite", - "rustls 0.21.7", + "rustls", "rustls-webpki", "tokio", "tokio-openssl", @@ -645,6 +645,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.0", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.4.0" @@ -1169,9 +1214,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fixedbitset" -version = "0.2.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" @@ -1453,6 +1498,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.3" @@ -1967,6 +2018,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.6.4" @@ -2374,12 +2431,12 @@ dependencies = [ [[package]] name = "petgraph" -version = "0.5.1" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 1.9.3", + "indexmap 2.0.2", ] [[package]] @@ -2488,6 +2545,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.38", +] + [[package]] name = "proc-macro2" version = "1.0.69" @@ -2530,9 +2597,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.8.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -2540,42 +2607,45 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.8.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools", "log", "multimap", + "once_cell", "petgraph", + "prettyplease", "prost", "prost-types", + "regex", + "syn 2.0.38", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.8.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "600d2f334aa05acb02a755e217ef1ab6dea4d51b58b7846588b747edec04efba" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] name = "prost-types" -version = "0.8.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ - "bytes", "prost", ] @@ -2790,19 +2860,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64 0.13.1", - "log", - "ring", - "sct 0.6.1", - "webpki", -] - [[package]] name = "rustls" version = "0.21.7" @@ -2812,7 +2869,7 @@ dependencies = [ "log", "ring", "rustls-webpki", - "sct 0.7.0", + "sct", ] [[package]] @@ -2834,6 +2891,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.15" @@ -2876,16 +2939,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -3150,6 +3203,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "system-configuration" version = "0.5.1" @@ -3317,13 +3376,12 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.22.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.19.1", + "rustls", "tokio", - "webpki", ] [[package]] @@ -3380,16 +3438,15 @@ dependencies = [ [[package]] name = "tonic" -version = "0.5.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796c5e1cd49905e65dd8e700d4cb1dffcbfdb4fc9d017de08c1a537afd83627c" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", - "base64 0.13.1", + "axum", + "base64 0.21.4", "bytes", - "futures-core", - "futures-util", "h2", "http", "http-body", @@ -3398,28 +3455,28 @@ dependencies = [ "percent-encoding 2.3.0", "pin-project", "prost", - "prost-derive", + "rustls", + "rustls-pemfile", "tokio", "tokio-rustls", "tokio-stream", - "tokio-util 0.6.10", "tower", "tower-layer", "tower-service", "tracing", - "tracing-futures", ] [[package]] name = "tonic-build" -version = "0.5.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ + "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 1.0.109", + "syn 2.0.38", ] [[package]] @@ -3506,16 +3563,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "treediff" version = "4.0.2" @@ -3867,16 +3914,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "which" version = "4.4.2" @@ -4036,7 +4073,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c0b0a4701f203ebaecce4971a6bb8575aa07b617bdc39ddfc6ffeff3a38530d" dependencies = [ - "heck", + "heck 0.3.3", "log", "proc-macro2", "quote", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index dd0ec3f65..3f0ded758 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -29,18 +29,18 @@ lazy_static = "1.4" log = "0.4" mockall_double = "0.2.0" prometheus = { version = "0.12.0", features = ["process"] } -prost = "0.8.0" +prost = "0.12" serde = "1.0.104" serde_derive = "1.0.104" serde_json = "1.0.45" serde_yaml = { version = "0.8.11", optional = true } tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = "0.5.2" +tonic = "0.10" tower = "0.4.8" [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.10" [dev-dependencies] # for testing using a simple discovery handler diff --git a/agent/src/util/registration.rs b/agent/src/util/registration.rs index 9bf6d8204..fc1c8c7ee 100644 --- a/agent/src/util/registration.rs +++ b/agent/src/util/registration.rs @@ -102,7 +102,7 @@ impl Registration for AgentRegistration { let endpoint = req.endpoint.clone(); let dh_endpoint = create_discovery_handler_endpoint( &endpoint, - EndpointType::from_i32(req.endpoint_type).unwrap(), + EndpointType::try_from(req.endpoint_type).unwrap(), ); info!( "register_discovery_handler - called with register request {:?}", diff --git a/agent/src/util/v1beta1.rs b/agent/src/util/v1beta1.rs index 6cea16015..c84581fee 100644 --- a/agent/src/util/v1beta1.rs +++ b/agent/src/util/v1beta1.rs @@ -1,9 +1,11 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DevicePluginOptions { /// Indicates if PreStartContainer call is required before each container start #[prost(bool, tag = "1")] pub pre_start_required: bool, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterRequest { /// Version of the API the Device Plugin was built against @@ -20,11 +22,13 @@ pub struct RegisterRequest { #[prost(message, optional, tag = "4")] pub options: ::core::option::Option, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} /// ListAndWatch returns a stream of List of Devices /// Whenever a Device state change or a Device disapears, ListAndWatch /// returns the new list +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListAndWatchResponse { #[prost(message, repeated, tag = "1")] @@ -32,9 +36,10 @@ pub struct ListAndWatchResponse { } /// E.g: /// struct Device { -/// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", -/// State: "Healthy", -///} +/// ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e", +/// State: "Healthy", +/// } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Device { /// A unique ID assigned by the device plugin used @@ -49,26 +54,30 @@ pub struct Device { /// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase. /// - PreStartContainer allows kubelet to pass reinitialized devices to containers. /// - PreStartContainer allows Device Plugin to run device specific operations on -/// the Devices requested +/// the Devices requested +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreStartContainerRequest { #[prost(string, repeated, tag = "1")] pub devices_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } /// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreStartContainerResponse {} /// - Allocate is expected to be called during pod creation since allocation -/// failures for any container would result in pod startup failure. +/// failures for any container would result in pod startup failure. /// - Allocate allows kubelet to exposes additional artifacts in a pod's -/// environment as directed by the plugin. +/// environment as directed by the plugin. /// - Allocate allows Device Plugin to run device specific operations on -/// the Devices requested +/// the Devices requested +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AllocateRequest { #[prost(message, repeated, tag = "1")] pub container_requests: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ContainerAllocateRequest { #[prost(string, repeated, tag = "1")] @@ -82,11 +91,13 @@ pub struct ContainerAllocateRequest { /// Allocation on dev1 succeeds but allocation on dev2 fails. /// The Device plugin should send a ListAndWatch update and fail the /// Allocation request +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct AllocateResponse { #[prost(message, repeated, tag = "1")] pub container_responses: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ContainerAllocateResponse { /// List of environment variable to be set in the container to access one of more devices. @@ -106,6 +117,7 @@ pub struct ContainerAllocateResponse { } /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Mount { /// Path of the mount within the container. @@ -119,6 +131,7 @@ pub struct Mount { pub read_only: bool, } /// DeviceSpec specifies a host device to mount into a container. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeviceSpec { /// Path of the device within the container. @@ -134,25 +147,26 @@ pub struct DeviceSpec { #[prost(string, tag = "3")] pub permissions: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " Registration is the service advertised by the Kubelet"] - #[doc = " Only when Kubelet answers with a success code to a Register Request"] - #[doc = " may Device Plugins start their service"] - #[doc = " Registration may fail when device plugin version is not supported by"] - #[doc = " Kubelet or the registered resourceName is already taken by another"] - #[doc = " active device plugin. Device plugin is expected to terminate upon registration failure"] + /// Registration is the service advertised by the Kubelet + /// Only when Kubelet answers with a success code to a Register Request + /// may Device Plugins start their service + /// Registration may fail when device plugin version is not supported by + /// Kubelet or the registered resourceName is already taken by another + /// active device plugin. Device plugin is expected to terminate upon registration failure #[derive(Debug, Clone)] pub struct RegistrationClient { inner: tonic::client::Grpc, } impl RegistrationClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -162,20 +176,25 @@ pub mod registration_client { impl RegistrationClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> RegistrationClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -187,23 +206,41 @@ pub mod registration_client { { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn register( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -212,24 +249,28 @@ pub mod registration_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.Registration/Register"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.Registration", "Register")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod device_plugin_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " DevicePlugin is the service advertised by Device Plugins"] + /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug, Clone)] pub struct DevicePluginClient { inner: tonic::client::Grpc, } impl DevicePluginClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -239,20 +280,25 @@ pub mod device_plugin_client { impl DevicePluginClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> DevicePluginClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -264,25 +310,44 @@ pub mod device_plugin_client { { DevicePluginClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); self } - #[doc = " GetDevicePluginOptions returns options to be communicated with Device"] - #[doc = " Manager"] + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// GetDevicePluginOptions returns options to be communicated with Device + /// Manager pub async fn get_device_plugin_options( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> + { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -293,15 +358,20 @@ pub mod device_plugin_client { let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetDevicePluginOptions", ); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "v1beta1.DevicePlugin", + "GetDevicePluginOptions", + )); + self.inner.unary(req, path, codec).await } - #[doc = " ListAndWatch returns a stream of List of Devices"] - #[doc = " Whenever a Device state change or a Device disapears, ListAndWatch"] - #[doc = " returns the new list"] + /// ListAndWatch returns a stream of List of Devices + /// Whenever a Device state change or a Device disapears, ListAndWatch + /// returns the new list pub async fn list_and_watch( &mut self, request: impl tonic::IntoRequest, - ) -> Result< + ) -> std::result::Result< tonic::Response>, tonic::Status, > { @@ -313,17 +383,18 @@ pub mod device_plugin_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/ListAndWatch"); - self.inner - .server_streaming(request.into_request(), path, codec) - .await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "ListAndWatch")); + self.inner.server_streaming(req, path, codec).await } - #[doc = " Allocate is called during container creation so that the Device"] - #[doc = " Plugin can run device specific operations and instruct Kubelet"] - #[doc = " of the steps to make the Device available in the container"] + /// Allocate is called during container creation so that the Device + /// Plugin can run device specific operations and instruct Kubelet + /// of the steps to make the Device available in the container pub async fn allocate( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -332,15 +403,19 @@ pub mod device_plugin_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/Allocate"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "Allocate")); + self.inner.unary(req, path, codec).await } - #[doc = " PreStartContainer is called, if indicated by Device Plugin during registeration phase,"] - #[doc = " before each container start. Device plugin can run device specific operations"] - #[doc = " such as reseting the device before making devices available to the container"] + /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, + /// before each container start. Device plugin can run device specific operations + /// such as reseting the device before making devices available to the container pub async fn pre_start_container( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> + { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -350,43 +425,52 @@ pub mod device_plugin_client { let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/PreStartContainer"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v1beta1.DevicePlugin", "PreStartContainer")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod registration_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with RegistrationServer."] + /// Generated trait containing gRPC methods that should be implemented for use with RegistrationServer. #[async_trait] pub trait Registration: Send + Sync + 'static { async fn register( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " Registration is the service advertised by the Kubelet"] - #[doc = " Only when Kubelet answers with a success code to a Register Request"] - #[doc = " may Device Plugins start their service"] - #[doc = " Registration may fail when device plugin version is not supported by"] - #[doc = " Kubelet or the registered resourceName is already taken by another"] - #[doc = " active device plugin. Device plugin is expected to terminate upon registration failure"] + /// Registration is the service advertised by the Kubelet + /// Only when Kubelet answers with a success code to a Register Request + /// may Device Plugins start their service + /// Registration may fail when device plugin version is not supported by + /// Kubelet or the registered resourceName is already taken by another + /// active device plugin. Device plugin is expected to terminate upon registration failure #[derive(Debug)] pub struct RegistrationServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl RegistrationServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -395,17 +479,48 @@ pub mod registration_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for RegistrationServer where T: Registration, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -421,22 +536,30 @@ pub mod registration_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).register(request).await }; + let inner = Arc::clone(&self.0); + let fut = + async move { ::register(&inner, request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = RegisterSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -460,12 +583,14 @@ pub mod registration_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -473,66 +598,72 @@ pub mod registration_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for RegistrationServer { + impl tonic::server::NamedService for RegistrationServer { const NAME: &'static str = "v1beta1.Registration"; } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod device_plugin_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with DevicePluginServer."] + /// Generated trait containing gRPC methods that should be implemented for use with DevicePluginServer. #[async_trait] pub trait DevicePlugin: Send + Sync + 'static { - #[doc = " GetDevicePluginOptions returns options to be communicated with Device"] - #[doc = " Manager"] + /// GetDevicePluginOptions returns options to be communicated with Device + /// Manager async fn get_device_plugin_options( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListAndWatch method."] - type ListAndWatchStream: futures_core::Stream> - + Send - + Sync + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the ListAndWatch method. + type ListAndWatchStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + Send + 'static; - #[doc = " ListAndWatch returns a stream of List of Devices"] - #[doc = " Whenever a Device state change or a Device disapears, ListAndWatch"] - #[doc = " returns the new list"] + /// ListAndWatch returns a stream of List of Devices + /// Whenever a Device state change or a Device disapears, ListAndWatch + /// returns the new list async fn list_and_watch( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = " Allocate is called during container creation so that the Device"] - #[doc = " Plugin can run device specific operations and instruct Kubelet"] - #[doc = " of the steps to make the Device available in the container"] + ) -> std::result::Result, tonic::Status>; + /// Allocate is called during container creation so that the Device + /// Plugin can run device specific operations and instruct Kubelet + /// of the steps to make the Device available in the container async fn allocate( &self, request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = " PreStartContainer is called, if indicated by Device Plugin during registeration phase,"] - #[doc = " before each container start. Device plugin can run device specific operations"] - #[doc = " such as reseting the device before making devices available to the container"] + ) -> std::result::Result, tonic::Status>; + /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, + /// before each container start. Device plugin can run device specific operations + /// such as reseting the device before making devices available to the container async fn pre_start_container( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " DevicePlugin is the service advertised by Device Plugins"] + /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug)] pub struct DevicePluginServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl DevicePluginServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -541,17 +672,48 @@ pub mod device_plugin_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for DevicePluginServer where T: DevicePlugin, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -564,23 +726,32 @@ pub mod device_plugin_server { type Response = super::DevicePluginOptions; type Future = BoxFuture, tonic::Status>; fn call(&mut self, request: tonic::Request) -> Self::Future { - let inner = self.0.clone(); - let fut = - async move { (*inner).get_device_plugin_options(request).await }; + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_device_plugin_options(&inner, request) + .await + }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = GetDevicePluginOptionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -595,22 +766,31 @@ pub mod device_plugin_server { type Future = BoxFuture, tonic::Status>; fn call(&mut self, request: tonic::Request) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_and_watch(request).await }; + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_and_watch(&inner, request).await + }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = ListAndWatchSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -626,22 +806,30 @@ pub mod device_plugin_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).allocate(request).await }; + let inner = Arc::clone(&self.0); + let fut = + async move { ::allocate(&inner, request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = AllocateSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -660,22 +848,31 @@ pub mod device_plugin_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).pre_start_container(request).await }; + let inner = Arc::clone(&self.0); + let fut = async move { + ::pre_start_container(&inner, request).await + }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = PreStartContainerSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -699,12 +896,14 @@ pub mod device_plugin_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -712,7 +911,7 @@ pub mod device_plugin_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for DevicePluginServer { + impl tonic::server::NamedService for DevicePluginServer { const NAME: &'static str = "v1beta1.DevicePlugin"; } } diff --git a/discovery-handlers/debug-echo/Cargo.toml b/discovery-handlers/debug-echo/Cargo.toml index e83f22906..0bed32719 100644 --- a/discovery-handlers/debug-echo/Cargo.toml +++ b/discovery-handlers/debug-echo/Cargo.toml @@ -16,7 +16,7 @@ serde = "1.0.104" serde_derive = "1.0.104" tokio = { version = "1.0.1", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.10", features = ["tls"] } [dev-dependencies] akri-shared = { path = "../../shared" } diff --git a/discovery-handlers/onvif/Cargo.toml b/discovery-handlers/onvif/Cargo.toml index d8554abc2..33920290e 100644 --- a/discovery-handlers/onvif/Cargo.toml +++ b/discovery-handlers/onvif/Cargo.toml @@ -26,7 +26,7 @@ sxd-document = "0.3.0" sxd-xpath = "0.4.0" tokio = { version = "1.0", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.10", features = ["tls"] } uuid = { version = "0.8.1", features = ["v4"] } yaserde = "0.7.1" yaserde_derive = "0.7.1" diff --git a/discovery-handlers/opcua/Cargo.toml b/discovery-handlers/opcua/Cargo.toml index 4328fde02..e3d6a3df9 100644 --- a/discovery-handlers/opcua/Cargo.toml +++ b/discovery-handlers/opcua/Cargo.toml @@ -18,7 +18,7 @@ serde = "1.0.104" serde_derive = "1.0.1" tokio = { version = "1.0.2", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.10", features = ["tls"] } url = "2.2.0" [dev-dependencies] diff --git a/discovery-handlers/udev/Cargo.toml b/discovery-handlers/udev/Cargo.toml index 00e76bc1c..a01a4aa30 100644 --- a/discovery-handlers/udev/Cargo.toml +++ b/discovery-handlers/udev/Cargo.toml @@ -20,7 +20,7 @@ serde = "1.0.104" serde_derive = "1.0.104" tokio = { version = "1.0", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.10", features = ["tls"] } udev = "0.5" [dev-dependencies] diff --git a/discovery-utils/Cargo.toml b/discovery-utils/Cargo.toml index bce9d7d24..0423be9b7 100644 --- a/discovery-utils/Cargo.toml +++ b/discovery-utils/Cargo.toml @@ -15,14 +15,14 @@ anyhow = "1.0.38" async-trait = { version = "0.1.0", optional = true } futures = { version = "0.3.1", package = "futures" } log = "0.4" -prost = "0.8" +prost = "0.12" serde = "1.0" serde_derive = "1.0" serde_yaml = "0.8.11" tempfile = { version = "3.1.0", optional = true } tokio = { version = "1.0.1", features = ["time", "net", "sync"] } tokio-stream = { version = "0.1", features = ["net"] } -tonic = { version = "0.5.2", features = ["tls"] } +tonic = { version = "0.10", features = ["tls"] } tower = "0.4.8" [features] @@ -33,4 +33,4 @@ async-trait = "0.1.0" tempfile = "3.1.0" [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.10" diff --git a/discovery-utils/src/discovery/v0.rs b/discovery-utils/src/discovery/v0.rs index 1e3a7d849..cc60bfcb8 100644 --- a/discovery-utils/src/discovery/v0.rs +++ b/discovery-utils/src/discovery/v0.rs @@ -1,3 +1,4 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterDiscoveryHandlerRequest { /// Name of the `DiscoveryHandler`. This name is specified in an @@ -26,14 +27,37 @@ pub mod register_discovery_handler_request { Uds = 0, Network = 1, } + impl EndpointType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + EndpointType::Uds => "UDS", + EndpointType::Network => "NETWORK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UDS" => Some(Self::Uds), + "NETWORK" => Some(Self::Network), + _ => None, + } + } + } } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ByteData { #[prost(bytes = "vec", optional, tag = "1")] pub vec: ::core::option::Option<::prost::alloc::vec::Vec>, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DiscoverRequest { /// String containing all the details (such as filtering options) @@ -45,12 +69,14 @@ pub struct DiscoverRequest { #[prost(map = "string, message", tag = "2")] pub discovery_properties: ::std::collections::HashMap<::prost::alloc::string::String, ByteData>, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DiscoverResponse { /// List of discovered devices #[prost(message, repeated, tag = "1")] pub devices: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Device { /// Identifier for this device @@ -72,6 +98,7 @@ pub struct Device { /// From Device Plugin API /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Mount { /// Path of the mount within the container. @@ -86,6 +113,7 @@ pub struct Mount { } /// From Device Plugin API /// DeviceSpec specifies a host device to mount into a container. +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct DeviceSpec { /// Path of the device within the container. @@ -101,21 +129,22 @@ pub struct DeviceSpec { #[prost(string, tag = "3")] pub permissions: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; - #[doc = " Registration is the service advertised by the Akri Agent."] - #[doc = " Any `DiscoveryHandler` can register with the Akri Agent."] + /// Registration is the service advertised by the Akri Agent. + /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug, Clone)] pub struct RegistrationClient { inner: tonic::client::Grpc, } impl RegistrationClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -125,20 +154,25 @@ pub mod registration_client { impl RegistrationClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> RegistrationClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -150,23 +184,41 @@ pub mod registration_client { { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn register_discovery_handler( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -176,23 +228,29 @@ pub mod registration_client { let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v0.Registration/RegisterDiscoveryHandler"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "v0.Registration", + "RegisterDiscoveryHandler", + )); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod discovery_handler_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; #[derive(Debug, Clone)] pub struct DiscoveryHandlerClient { inner: tonic::client::Grpc, } impl DiscoveryHandlerClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -202,20 +260,25 @@ pub mod discovery_handler_client { impl DiscoveryHandlerClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> DiscoveryHandlerClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -227,24 +290,44 @@ pub mod discovery_handler_client { { DiscoveryHandlerClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn discover( &mut self, request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -253,41 +336,48 @@ pub mod discovery_handler_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/v0.DiscoveryHandler/Discover"); - self.inner - .server_streaming(request.into_request(), path, codec) - .await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("v0.DiscoveryHandler", "Discover")); + self.inner.server_streaming(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod registration_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with RegistrationServer."] + /// Generated trait containing gRPC methods that should be implemented for use with RegistrationServer. #[async_trait] pub trait Registration: Send + Sync + 'static { async fn register_discovery_handler( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } - #[doc = " Registration is the service advertised by the Akri Agent."] - #[doc = " Any `DiscoveryHandler` can register with the Akri Agent."] + /// Registration is the service advertised by the Akri Agent. + /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug)] pub struct RegistrationServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl RegistrationServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -296,17 +386,48 @@ pub mod registration_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for RegistrationServer where T: Registration, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -325,23 +446,32 @@ pub mod registration_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = - async move { (*inner).register_discovery_handler(request).await }; + let inner = Arc::clone(&self.0); + let fut = async move { + ::register_discovery_handler(&inner, request) + .await + }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = RegisterDiscoveryHandlerSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -365,12 +495,14 @@ pub mod registration_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -378,42 +510,48 @@ pub mod registration_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for RegistrationServer { + impl tonic::server::NamedService for RegistrationServer { const NAME: &'static str = "v0.Registration"; } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod discovery_handler_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with DiscoveryHandlerServer."] + /// Generated trait containing gRPC methods that should be implemented for use with DiscoveryHandlerServer. #[async_trait] pub trait DiscoveryHandler: Send + Sync + 'static { - #[doc = "Server streaming response type for the Discover method."] - type DiscoverStream: futures_core::Stream> - + Send - + Sync + /// Server streaming response type for the Discover method. + type DiscoverStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + Send + 'static; async fn discover( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct DiscoveryHandlerServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl DiscoveryHandlerServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -422,17 +560,48 @@ pub mod discovery_handler_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for DiscoveryHandlerServer where T: DiscoveryHandler, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -453,22 +622,31 @@ pub mod discovery_handler_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).discover(request).await }; + let inner = Arc::clone(&self.0); + let fut = async move { + ::discover(&inner, request).await + }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = DiscoverSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -492,12 +670,14 @@ pub mod discovery_handler_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -505,7 +685,7 @@ pub mod discovery_handler_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for DiscoveryHandlerServer { + impl tonic::server::NamedService for DiscoveryHandlerServer { const NAME: &'static str = "v0.DiscoveryHandler"; } } diff --git a/samples/brokers/udev-video-broker/Cargo.toml b/samples/brokers/udev-video-broker/Cargo.toml index 266d285b5..a47b3ff08 100644 --- a/samples/brokers/udev-video-broker/Cargo.toml +++ b/samples/brokers/udev-video-broker/Cargo.toml @@ -14,11 +14,11 @@ env_logger = "0.10.0" lazy_static = "1.4" log = "0.4.3" prometheus = { version = "0.12.0", features = ["process"] } -prost = "0.8.0" +prost = "0.12" regex = "1" tokio = { version = "1.0.1", features = ["time", "fs", "macros", "signal"] } -tonic = "0.5.2" +tonic = "0.10" rscam = "0.5.5" [build-dependencies] -tonic-build = "0.5.2" +tonic-build = "0.10" diff --git a/samples/brokers/udev-video-broker/src/util/camera.rs b/samples/brokers/udev-video-broker/src/util/camera.rs index 58b176209..5245d93fb 100644 --- a/samples/brokers/udev-video-broker/src/util/camera.rs +++ b/samples/brokers/udev-video-broker/src/util/camera.rs @@ -1,5 +1,7 @@ +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NotifyRequest {} +#[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NotifyResponse { #[prost(bytes = "vec", tag = "1")] @@ -7,19 +9,20 @@ pub struct NotifyResponse { #[prost(string, tag = "2")] pub camera: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod camera_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::http::Uri; use tonic::codegen::*; #[derive(Debug, Clone)] pub struct CameraClient { inner: tonic::client::Grpc, } impl CameraClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where - D: std::convert::TryInto, + D: TryInto, D::Error: Into, { let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; @@ -29,20 +32,25 @@ pub mod camera_client { impl CameraClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, + T::ResponseBody: Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } pub fn with_interceptor( inner: T, interceptor: F, ) -> CameraClient> where F: tonic::service::Interceptor, + T::ResponseBody: Default, T: tonic::codegen::Service< http::Request, Response = http::Response< @@ -54,23 +62,41 @@ pub mod camera_client { { CameraClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); self } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); self } pub async fn get_frame( &mut self, request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { + ) -> std::result::Result, tonic::Status> { self.inner.ready().await.map_err(|e| { tonic::Status::new( tonic::Code::Unknown, @@ -79,37 +105,46 @@ pub mod camera_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/camera.Camera/GetFrame"); - self.inner.unary(request.into_request(), path, codec).await + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("camera.Camera", "GetFrame")); + self.inner.unary(req, path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod camera_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with CameraServer."] + /// Generated trait containing gRPC methods that should be implemented for use with CameraServer. #[async_trait] pub trait Camera: Send + Sync + 'static { async fn get_frame( &self, request: tonic::Request, - ) -> Result, tonic::Status>; + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct CameraServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, } struct _Inner(Arc); impl CameraServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, } } pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService @@ -118,17 +153,48 @@ pub mod camera_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } } impl tonic::codegen::Service> for CameraServer where T: Camera, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -144,22 +210,30 @@ pub mod camera_server { &mut self, request: tonic::Request, ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).get_frame(request).await }; + let inner = Arc::clone(&self.0); + let fut = + async move { ::get_frame(&inner, request).await }; Box::pin(fut) } } let accept_compression_encodings = self.accept_compression_encodings; let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { let inner = inner.0; let method = GetFrameSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -183,12 +257,14 @@ pub mod camera_server { inner, accept_compression_encodings: self.accept_compression_encodings, send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, } } } impl Clone for _Inner { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } impl std::fmt::Debug for _Inner { @@ -196,7 +272,7 @@ pub mod camera_server { write!(f, "{:?}", self.0) } } - impl tonic::transport::NamedService for CameraServer { + impl tonic::server::NamedService for CameraServer { const NAME: &'static str = "camera.Camera"; } } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index d3c1e4c33..c0001783b 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -24,7 +24,7 @@ serde_derive = "1.0" serde_json = "1.0" serde_yaml = "0.8" tokio = { version = "1.0.1", features = ["full"] } -tonic = "0.5.2" +tonic = "0.10" tower = "0.4.8" warp = "0.3.6"