From 5e798a3312dbeb42d331b4e767647db3ec62c4c2 Mon Sep 17 00:00:00 2001 From: Flint Date: Fri, 11 Oct 2024 12:55:27 +0900 Subject: [PATCH] substrate-offchain: upgrade hyper to v1 --- Cargo.lock | 67 +++++--- Cargo.toml | 5 +- polkadot/node/service/src/lib.rs | 6 +- prdoc/pr_5919.prdoc | 19 +++ substrate/bin/node/cli/src/service.rs | 14 +- substrate/client/offchain/Cargo.toml | 8 +- substrate/client/offchain/src/api.rs | 2 +- substrate/client/offchain/src/api/http.rs | 183 +++++++++++++--------- substrate/client/offchain/src/lib.rs | 11 +- templates/minimal/node/src/service.rs | 12 +- templates/parachain/node/src/service.rs | 12 +- templates/solochain/node/src/service.rs | 12 +- 12 files changed, 208 insertions(+), 143 deletions(-) create mode 100644 prdoc/pr_5919.prdoc diff --git a/Cargo.lock b/Cargo.lock index ca71985b69f07..ed6572fd7de79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7432,16 +7432,17 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.2" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.3.1", "hyper-util", "log", - "rustls 0.23.10", + "rustls 0.23.14", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -8077,7 +8078,7 @@ dependencies = [ "http 1.1.0", "jsonrpsee-core 0.23.2", "pin-project", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-pki-types", "rustls-platform-verifier", "soketto 0.8.0", @@ -8100,7 +8101,7 @@ dependencies = [ "http 1.1.0", "jsonrpsee-core 0.24.3", "pin-project", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-pki-types", "rustls-platform-verifier", "soketto 0.8.0", @@ -8213,11 +8214,11 @@ dependencies = [ "base64 0.22.1", "http-body 1.0.0", "hyper 1.3.1", - "hyper-rustls 0.27.2", + "hyper-rustls 0.27.3", "hyper-util", "jsonrpsee-core 0.24.3", "jsonrpsee-types 0.24.3", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-platform-verifier", "serde", "serde_json", @@ -17012,7 +17013,7 @@ dependencies = [ "quinn-proto 0.11.8", "quinn-udp 0.5.4", "rustc-hash 2.0.0", - "rustls 0.23.10", + "rustls 0.23.14", "socket2 0.5.7", "thiserror", "tokio", @@ -17046,7 +17047,7 @@ dependencies = [ "rand", "ring 0.17.7", "rustc-hash 2.0.0", - "rustls 0.23.10", + "rustls 0.23.14", "slab", "thiserror", "tinyvec", @@ -17539,7 +17540,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.3.1", - "hyper-rustls 0.27.2", + "hyper-rustls 0.27.3", "hyper-util", "ipnet", "js-sys", @@ -17549,7 +17550,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn 0.11.5", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-pemfile 2.0.0", "rustls-pki-types", "serde", @@ -18118,22 +18119,22 @@ dependencies = [ "log", "ring 0.17.7", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.8", "subtle 2.5.0", "zeroize", ] [[package]] name = "rustls" -version = "0.23.10" +version = "0.23.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "log", "once_cell", "ring 0.17.7", "rustls-pki-types", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.8", "subtle 2.5.0", "zeroize", ] @@ -18163,6 +18164,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.0.0", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.3" @@ -18184,9 +18198,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" [[package]] name = "rustls-platform-verifier" @@ -18199,10 +18213,10 @@ dependencies = [ "jni", "log", "once_cell", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-native-certs 0.7.0", "rustls-platform-verifier-android", - "rustls-webpki 0.102.4", + "rustls-webpki 0.102.8", "security-framework", "security-framework-sys", "webpki-roots 0.26.3", @@ -18227,9 +18241,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ "ring 0.17.7", "rustls-pki-types", @@ -19403,14 +19417,17 @@ dependencies = [ "fnv", "futures", "futures-timer", - "hyper 0.14.29", - "hyper-rustls 0.24.2", + "http-body-util", + "hyper 1.3.1", + "hyper-rustls 0.27.3", + "hyper-util", "log", "num_cpus", "once_cell", "parity-scale-codec", "parking_lot 0.12.3", "rand", + "rustls 0.23.14", "sc-block-builder", "sc-client-api", "sc-client-db", @@ -24878,7 +24895,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.10", + "rustls 0.23.14", "rustls-pki-types", "tokio", ] @@ -25602,7 +25619,7 @@ dependencies = [ "flate2", "log", "once_cell", - "rustls 0.23.10", + "rustls 0.23.14", "rustls-pki-types", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index fde05e90ca6e6..5c4936e5754f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -799,10 +799,8 @@ http = { version = "1.1" } http-body = { version = "1", default-features = false } http-body-util = { version = "0.1.2", default-features = false } hyper = { version = "1.3.1", default-features = false } -hyper-rustls = { version = "0.24.2" } +hyper-rustls = { version = "0.27.3", default-features = false, features = ["http1", "http2", "logging", "ring", "rustls-native-certs", "tls12"] } hyper-util = { version = "0.1.5", default-features = false } -# TODO: remove hyper v0.14 https://github.com/paritytech/polkadot-sdk/issues/4896 -hyperv14 = { package = "hyper", version = "0.14.29", default-features = false } impl-serde = { version = "0.4.0", default-features = false } impl-trait-for-tuples = { version = "0.2.2" } indexmap = { version = "2.0.0" } @@ -1117,6 +1115,7 @@ rstest = { version = "0.18.2" } rustc-hash = { version = "1.1.0" } rustc-hex = { version = "2.1.0", default-features = false } rustix = { version = "0.36.7", default-features = false } +rustls = { version = "0.23.14", default-features = false, features = ["logging", "ring", "std", "tls12"] } rustversion = { version = "1.0.17" } rusty-fork = { version = "0.3.0", default-features = false } safe-mix = { version = "1.0", default-features = false } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 9515dd2311380..5ae69921fd75b 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -1042,7 +1042,7 @@ pub fn new_full< is_validator: role.is_authority(), enable_http_requests: false, custom_extensions: move |_| vec![], - }) + })? .run(client.clone(), task_manager.spawn_handle()) .boxed(), ); @@ -1434,7 +1434,7 @@ pub fn new_chain_ops( } else if config.chain_spec.is_kusama() { chain_ops!(config, None) } else if config.chain_spec.is_westend() { - return chain_ops!(config, None) + return chain_ops!(config, None); } else { chain_ops!(config, None) } @@ -1486,7 +1486,7 @@ pub fn revert_backend( let revertible = blocks.min(best_number - finalized); if revertible == 0 { - return Ok(()) + return Ok(()); } let number = best_number - revertible; diff --git a/prdoc/pr_5919.prdoc b/prdoc/pr_5919.prdoc new file mode 100644 index 0000000000000..5012e21187a5e --- /dev/null +++ b/prdoc/pr_5919.prdoc @@ -0,0 +1,19 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "substrate-offchain: upgrade hyper to v1" + +doc: + - audience: Node Dev + description: | + Bump depencency `hyper` of `substrait-offchain` for http from `0.14` to `1`. + This changed APIs a bit; + - `sc_offchain::Offchainworker::new()` now returns `std::io::Result` (Previously was `Self`) + +crates: + - name: sc-offchain + bump: minor + - name: polkadot-service + bump: patch + - name: staging-node-cli + bump: patch diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 69e953f54e42e..3f6afb79f9512 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -783,9 +783,7 @@ pub fn new_full_base::Hash>>( ); if enable_offchain_worker { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-work", + let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { runtime_api_provider: client.clone(), keystore: Some(keystore_container.keystore()), @@ -799,9 +797,11 @@ pub fn new_full_base::Hash>>( custom_extensions: move |_| { vec![Box::new(statement_store.clone().as_statement_store_ext()) as Box<_>] }, - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), + })?; + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-work", + offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(), ); } @@ -985,7 +985,7 @@ mod tests { sc_consensus_babe::authorship::claim_slot(slot.into(), &epoch, &keystore) .map(|(digest, _)| digest) { - break (babe_pre_digest, epoch_descriptor) + break (babe_pre_digest, epoch_descriptor); } slot += 1; diff --git a/substrate/client/offchain/Cargo.toml b/substrate/client/offchain/Cargo.toml index bbbe7018d1060..0b23c0d125ac0 100644 --- a/substrate/client/offchain/Cargo.toml +++ b/substrate/client/offchain/Cargo.toml @@ -22,15 +22,15 @@ codec = { features = ["derive"], workspace = true, default-features = true } fnv = { workspace = true } futures = { workspace = true } futures-timer = { workspace = true } -hyperv14 = { features = [ - "http2", - "stream", -], workspace = true, default-features = true } +http-body-util = { workspace = true } +hyper = { features = ["http2"], workspace = true, default-features = true } hyper-rustls = { features = ["http2"], workspace = true } +hyper-util = { features = ["client-legacy"], workspace = true } num_cpus = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true, default-features = true } rand = { workspace = true, default-features = true } +rustls = { workspace = true } threadpool = { workspace = true } tracing = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } diff --git a/substrate/client/offchain/src/api.rs b/substrate/client/offchain/src/api.rs index 19ccdbcf498f4..a5981f14c093c 100644 --- a/substrate/client/offchain/src/api.rs +++ b/substrate/client/offchain/src/api.rs @@ -326,7 +326,7 @@ mod tests { fn offchain_api() -> (Api, AsyncApi) { sp_tracing::try_init_simple(); let mock = Arc::new(TestNetwork()); - let shared_client = SharedClient::new(); + let shared_client = SharedClient::new().unwrap(); AsyncApi::new(mock, false, shared_client) } diff --git a/substrate/client/offchain/src/api/http.rs b/substrate/client/offchain/src/api/http.rs index 73407b1359d77..56f5c0230094e 100644 --- a/substrate/client/offchain/src/api/http.rs +++ b/substrate/client/offchain/src/api/http.rs @@ -27,14 +27,14 @@ //! (i.e.: the socket should continue being processed) in the background even if the runtime isn't //! actively calling any function. -use hyperv14 as hyper; - use crate::api::timestamp; use bytes::buf::{Buf, Reader}; use fnv::FnvHashMap; use futures::{channel::mpsc, future, prelude::*}; -use hyper::{client, Body, Client as HyperClient}; +use http_body_util::{combinators::BoxBody, StreamBody}; +use hyper::body::Body as _; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; +use hyper_util::{client::legacy as client, rt::TokioExecutor}; use once_cell::sync::Lazy; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp}; @@ -48,21 +48,26 @@ use std::{ const LOG_TARGET: &str = "offchain-worker::http"; +pub type Body = BoxBody; + +type Sender = mpsc::Sender, hyper::Error>>; +type Receiver = mpsc::Receiver, hyper::Error>>; + +type HyperClient = client::Client, Body>; +type LazyClient = Lazy HyperClient + Send>>; + /// Wrapper struct used for keeping the hyper_rustls client running. #[derive(Clone)] -pub struct SharedClient(Arc, Body>>>); +pub struct SharedClient(Arc); impl SharedClient { - pub fn new() -> Self { - Self(Arc::new(Lazy::new(|| { - let connector = HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - HyperClient::builder().build(connector) - }))) + pub fn new() -> std::io::Result { + let builder = HttpsConnectorBuilder::new() + .with_provider_and_native_roots(rustls::crypto::ring::default_provider())?; + Ok(Self(Arc::new(Lazy::new(Box::new(|| { + let connector = builder.https_or_http().enable_http1().enable_http2().build(); + client::Client::builder(TokioExecutor::new()).build(connector) + }))))) } } @@ -105,23 +110,23 @@ pub struct HttpApi { /// One active request within `HttpApi`. enum HttpApiRequest { /// The request object is being constructed locally and not started yet. - NotDispatched(hyper::Request, hyper::body::Sender), + NotDispatched(hyper::Request, Sender), /// The request has been dispatched and we're in the process of sending out the body (if the /// field is `Some`) or waiting for a response (if the field is `None`). - Dispatched(Option), + Dispatched(Option), /// Received a response. Response(HttpApiRequestRp), /// A request has been dispatched but the worker notified us of an error. We report this /// failure to the user as an `IoError` and remove the request from the list as soon as /// possible. - Fail(hyper::Error), + Fail(client::Error), } /// A request within `HttpApi` that has received a response. struct HttpApiRequestRp { /// We might still be writing the request's body when the response comes. /// This field allows to continue writing that body. - sending_body: Option, + sending_body: Option, /// Status code of the response. status_code: hyper::StatusCode, /// Headers of the response. @@ -132,7 +137,7 @@ struct HttpApiRequestRp { /// Elements extracted from the channel are first put into `current_read_chunk`. /// If the channel produces an error, then that is translated into an `IoError` and the request /// is removed from the list. - body: stream::Fuse>>, + body: stream::Fuse, /// Chunk that has been extracted from the channel and that is currently being read. /// Reading data from the response should read from this field in priority. current_read_chunk: Option>, @@ -144,7 +149,9 @@ impl HttpApi { // Start by building the prototype of the request. // We do this first so that we don't touch anything in `self` if building the prototype // fails. - let (body_sender, body) = hyper::Body::channel(); + let (body_sender, receiver) = mpsc::channel(0); + let body = StreamBody::new(receiver); + let body = BoxBody::new(body); let mut request = hyper::Request::new(body); *request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?; *request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?; @@ -158,7 +165,7 @@ impl HttpApi { target: LOG_TARGET, "Overflow in offchain worker HTTP request ID assignment" ); - return Err(()) + return Err(()); }, }; self.requests @@ -213,7 +220,7 @@ impl HttpApi { // Closure that writes data to a sender, taking the deadline into account. Can return `Ok` // (if the body has been written), or `DeadlineReached`, or `IoError`. // If `IoError` is returned, don't forget to remove the request from the list. - let mut poll_sender = move |sender: &mut hyper::body::Sender| -> Result<(), HttpError> { + let mut poll_sender = move |sender: &mut Sender| -> Result<(), HttpError> { let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx))); futures::executor::block_on(future::select(&mut when_ready, &mut deadline)); match when_ready { @@ -221,12 +228,15 @@ impl HttpApi { future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError), future::MaybeDone::Future(_) | future::MaybeDone::Gone => { debug_assert!(matches!(deadline, future::MaybeDone::Done(..))); - return Err(HttpError::DeadlineReached) + return Err(HttpError::DeadlineReached); }, }; futures::executor::block_on( - sender.send_data(hyper::body::Bytes::from(chunk.to_owned())), + async { + future::poll_fn(|cx| sender.poll_ready(cx)).await?; + sender.start_send(Ok(hyper::body::Frame::data(hyper::body::Bytes::from(chunk.to_owned())))) + } ) .map_err(|_| { tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready"); @@ -250,13 +260,13 @@ impl HttpApi { match poll_sender(&mut sender) { Err(HttpError::IoError) => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body"); - return Err(HttpError::IoError) + return Err(HttpError::IoError); }, other => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body"); self.requests .insert(request_id, HttpApiRequest::Dispatched(Some(sender))); - return other + return other; }, } } else { @@ -265,7 +275,7 @@ impl HttpApi { // Writing an empty body is a hint that we should stop writing. Dropping // the sender. self.requests.insert(request_id, HttpApiRequest::Dispatched(None)); - return Ok(()) + return Ok(()); } }, @@ -281,13 +291,13 @@ impl HttpApi { ) { Err(HttpError::IoError) => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body"); - return Err(HttpError::IoError) + return Err(HttpError::IoError); }, other => { tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body"); self.requests .insert(request_id, HttpApiRequest::Response(response)); - return other + return other; }, } } else { @@ -302,7 +312,7 @@ impl HttpApi { ..response }), ); - return Ok(()) + return Ok(()); } }, @@ -311,7 +321,7 @@ impl HttpApi { // If the request has already failed, return without putting back the request // in the list. - return Err(HttpError::IoError) + return Err(HttpError::IoError); }, v @ HttpApiRequest::Dispatched(None) | @@ -320,7 +330,7 @@ impl HttpApi { // We have already finished sending this body. self.requests.insert(request_id, v); - return Err(HttpError::Invalid) + return Err(HttpError::Invalid); }, } } @@ -340,7 +350,7 @@ impl HttpApi { Some(HttpApiRequest::Dispatched(sending_body)) | Some(HttpApiRequest::Response(HttpApiRequestRp { sending_body, .. })) => { let _ = sending_body.take(); - continue + continue; }, _ => continue, }; @@ -405,7 +415,7 @@ impl HttpApi { }, } } - return output + return output; } } @@ -418,7 +428,7 @@ impl HttpApi { msg } else { debug_assert!(matches!(deadline, future::MaybeDone::Done(..))); - continue + continue; } }; @@ -458,7 +468,7 @@ impl HttpApi { None => { tracing::error!(target: "offchain-worker::http", "Worker has crashed"); - return ids.iter().map(|_| HttpRequestStatus::IoError).collect() + return ids.iter().map(|_| HttpRequestStatus::IoError).collect(); }, } } @@ -498,14 +508,14 @@ impl HttpApi { // and we still haven't received a response. Some(rq @ HttpApiRequest::Dispatched(_)) => { self.requests.insert(request_id, rq); - return Err(HttpError::DeadlineReached) + return Err(HttpError::DeadlineReached); }, // The request has failed. Some(HttpApiRequest::Fail { .. }) => return Err(HttpError::IoError), // Request hasn't been dispatched yet; reading the body is invalid. Some(rq @ HttpApiRequest::NotDispatched(_, _)) => { self.requests.insert(request_id, rq); - return Err(HttpError::Invalid) + return Err(HttpError::Invalid); }, None => return Err(HttpError::Invalid), }; @@ -526,12 +536,12 @@ impl HttpApi { ..response }), ); - return Ok(n) + return Ok(n); }, Err(err) => { // This code should never be reached unless there's a logic error somewhere. tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err); - return Err(HttpError::IoError) + return Err(HttpError::IoError); }, } } @@ -544,7 +554,10 @@ impl HttpApi { if let future::MaybeDone::Done(next_body) = next_body { match next_body { - Some(Ok(chunk)) => response.current_read_chunk = Some(chunk.reader()), + Some(Ok(chunk)) => + if let Ok(chunk) = chunk.into_data() { + response.current_read_chunk = Some(chunk.reader()); + }, Some(Err(_)) => return Err(HttpError::IoError), None => return Ok(0), // eof } @@ -552,7 +565,7 @@ impl HttpApi { if let future::MaybeDone::Done(_) = deadline { self.requests.insert(request_id, HttpApiRequest::Response(response)); - return Err(HttpError::DeadlineReached) + return Err(HttpError::DeadlineReached); } } } @@ -587,7 +600,7 @@ enum ApiToWorker { /// ID to send back when the response comes back. id: HttpRequestId, /// Request to start executing. - request: hyper::Request, + request: hyper::Request, }, } @@ -608,14 +621,14 @@ enum WorkerToApi { /// the next item. /// Can also be used to send an error, in case an error happened on the HTTP socket. After /// an error is sent, the channel will close. - body: mpsc::Receiver>, + body: Receiver, }, /// A request has failed because of an error. The request is then no longer valid. Fail { /// The ID that was passed to the worker. id: HttpRequestId, /// Error that happened. - error: hyper::Error, + error: client::Error, }, } @@ -626,7 +639,7 @@ pub struct HttpWorker { /// Used to receive messages from the `HttpApi`. from_api: TracingUnboundedReceiver, /// The engine that runs HTTP requests. - http_client: Arc, Body>>>, + http_client: Arc, /// HTTP requests that are being worked on by the engine. requests: Vec<(HttpRequestId, HttpWorkerRequest)>, } @@ -634,13 +647,13 @@ pub struct HttpWorker { /// HTTP request being processed by the worker. enum HttpWorkerRequest { /// Request has been dispatched and is waiting for a response from the Internet. - Dispatched(hyper::client::ResponseFuture), + Dispatched(client::ResponseFuture), /// Progressively reading the body of the response and sending it to the channel. ReadBody { /// Body to read `Chunk`s from. Only used if the channel is ready to accept data. - body: hyper::Body, + body: Body, /// Channel to the [`HttpApi`] where we send the chunks to. - tx: mpsc::Sender>, + tx: Sender, }, } @@ -663,12 +676,12 @@ impl Future for HttpWorker { let response = match Future::poll(Pin::new(&mut future), cx) { Poll::Pending => { me.requests.push((id, HttpWorkerRequest::Dispatched(future))); - continue + continue; }, Poll::Ready(Ok(response)) => response, Poll::Ready(Err(error)) => { let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error }); - continue // don't insert the request back + continue; // don't insert the request back }, }; @@ -684,9 +697,12 @@ impl Future for HttpWorker { body: body_rx, }); - me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx: body_tx })); + me.requests.push(( + id, + HttpWorkerRequest::ReadBody { body: Body::new(body), tx: body_tx }, + )); cx.waker().wake_by_ref(); // reschedule in order to poll the new future - continue + continue; }, HttpWorkerRequest::ReadBody { mut body, mut tx } => { @@ -697,12 +713,11 @@ impl Future for HttpWorker { Poll::Ready(Err(_)) => continue, // don't insert the request back Poll::Pending => { me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); - continue + continue; }, } - // `tx` is ready. Read a chunk from the socket and send it to the channel. - match Stream::poll_next(Pin::new(&mut body), cx) { + match Pin::new(&mut body).poll_frame(cx) { Poll::Ready(Some(Ok(chunk))) => { let _ = tx.start_send(Ok(chunk)); me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx })); @@ -762,19 +777,22 @@ mod tests { }; use crate::api::timestamp; use core::convert::Infallible; - use futures::{future, StreamExt}; + use futures::future; + use http_body_util::BodyExt; use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus}; use std::sync::LazyLock; // Using LazyLock to avoid spawning lots of different SharedClients, // as spawning a SharedClient is CPU-intensive and opens lots of fds. - static SHARED_CLIENT: LazyLock = LazyLock::new(|| SharedClient::new()); + static SHARED_CLIENT: LazyLock = LazyLock::new(|| SharedClient::new().unwrap()); // Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP // server that runs in the background as well. macro_rules! build_api_server { () => { - build_api_server!(hyper::Response::new(hyper::Body::from("Hello World!"))) + build_api_server!(hyper::Response::new(http_body_util::Full::new( + hyper::body::Bytes::from("Hello World!") + ))) }; ( $response:expr ) => {{ let hyper_client = SHARED_CLIENT.clone(); @@ -785,21 +803,32 @@ mod tests { let rt = tokio::runtime::Runtime::new().unwrap(); let worker = rt.spawn(worker); let server = rt.spawn(async move { - let server = hyper::Server::bind(&"127.0.0.1:0".parse().unwrap()).serve( - hyper::service::make_service_fn(|_| async move { - Ok::<_, Infallible>(hyper::service::service_fn( - move |req: hyper::Request| async move { - // Wait until the complete request was received and processed, - // otherwise the tests are flaky. - let _ = req.into_body().collect::>().await; - - Ok::<_, Infallible>($response) - }, - )) - }), - ); - let _ = addr_tx.send(server.local_addr()); - server.await.map_err(drop) + let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + let _ = addr_tx.send(listener.local_addr().unwrap()); + loop { + let (stream, _) = listener.accept().await.unwrap(); + let io = hyper_util::rt::TokioIo::new(stream); + tokio::task::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection( + io, + hyper::service::service_fn( + move |req: hyper::Request| async move { + // Wait until the complete request was received and + // processed, otherwise the tests are flaky. + let _ = req.into_body().collect().await; + + Ok::<_, Infallible>($response) + }, + ), + ) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); + } }); let _ = rt.block_on(future::join(worker, server)); }); @@ -839,7 +868,7 @@ mod tests { let (mut api, addr) = build_api_server!(hyper::Response::builder() .version(hyper::Version::HTTP_2) - .body(hyper::Body::from("Hello World!")) + .body(http_body_util::Full::new(hyper::body::Bytes::from("Hello World!"))) .unwrap()); let id = api.request_start("POST", &format!("http://{}", addr)).unwrap(); @@ -1097,7 +1126,7 @@ mod tests { #[test] fn shared_http_client_is_only_initialized_on_access() { - let shared_client = SharedClient::new(); + let shared_client = SharedClient::new().unwrap(); { let mock = Arc::new(TestNetwork()); @@ -1112,7 +1141,7 @@ mod tests { // Check that the http client wasn't initialized, because it wasn't used. assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err()); - let shared_client = SharedClient::new(); + let shared_client = SharedClient::new().unwrap(); { let mock = Arc::new(TestNetwork()); diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs index 7cee64e6ce7ef..c4735c1c9733c 100644 --- a/substrate/client/offchain/src/lib.rs +++ b/substrate/client/offchain/src/lib.rs @@ -153,14 +153,14 @@ impl OffchainWorkers { enable_http_requests, custom_extensions, }: OffchainWorkerOptions, - ) -> Self { - Self { + ) -> std::io::Result { + Ok(Self { runtime_api_provider, thread_pool: Mutex::new(ThreadPool::with_name( "offchain-worker".into(), num_cpus::get(), )), - shared_http_client: api::SharedClient::new(), + shared_http_client: api::SharedClient::new()?, enable_http_requests, keystore, offchain_db: offchain_db.map(OffchainDb::new), @@ -168,7 +168,7 @@ impl OffchainWorkers { is_validator, network_provider, custom_extensions: Box::new(custom_extensions), - } + }) } } @@ -461,7 +461,8 @@ mod tests { is_validator: false, enable_http_requests: false, custom_extensions: |_| Vec::new(), - }); + }) + .unwrap(); futures::executor::block_on(offchain.on_block_imported(&header)); // then diff --git a/templates/minimal/node/src/service.rs b/templates/minimal/node/src/service.rs index 08cd345f1e3ed..f55822bace585 100644 --- a/templates/minimal/node/src/service.rs +++ b/templates/minimal/node/src/service.rs @@ -158,9 +158,7 @@ pub fn new_full::Ha })?; if config.offchain_worker.enabled { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-worker", + let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { runtime_api_provider: client.clone(), is_validator: config.role.is_authority(), @@ -172,9 +170,11 @@ pub fn new_full::Ha network_provider: Arc::new(network.clone()), enable_http_requests: true, custom_extensions: |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), + })?; + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-worker", + offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(), ); } diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index 04e20be2bd40d..f245c332c4f8c 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -282,9 +282,7 @@ pub async fn start_parachain_node( if parachain_config.offchain_worker.enabled { use futures::FutureExt; - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-work", + let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { runtime_api_provider: client.clone(), keystore: Some(params.keystore_container.keystore()), @@ -296,9 +294,11 @@ pub async fn start_parachain_node( is_validator: parachain_config.role.is_authority(), enable_http_requests: false, custom_extensions: move |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), + })?; + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-work", + offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(), ); } diff --git a/templates/solochain/node/src/service.rs b/templates/solochain/node/src/service.rs index 2de543235ec8c..ff3104d06a3e2 100644 --- a/templates/solochain/node/src/service.rs +++ b/templates/solochain/node/src/service.rs @@ -194,9 +194,7 @@ pub fn new_full< })?; if config.offchain_worker.enabled { - task_manager.spawn_handle().spawn( - "offchain-workers-runner", - "offchain-worker", + let offchain_workers = sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { runtime_api_provider: client.clone(), is_validator: config.role.is_authority(), @@ -208,9 +206,11 @@ pub fn new_full< network_provider: Arc::new(network.clone()), enable_http_requests: true, custom_extensions: |_| vec![], - }) - .run(client.clone(), task_manager.spawn_handle()) - .boxed(), + })?; + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-worker", + offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(), ); }