diff --git a/aws/sdk/integration-tests/s3/Cargo.toml b/aws/sdk/integration-tests/s3/Cargo.toml index 8ba7542e603..7a5d80e8ba3 100644 --- a/aws/sdk/integration-tests/s3/Cargo.toml +++ b/aws/sdk/integration-tests/s3/Cargo.toml @@ -26,6 +26,7 @@ aws-smithy-protocol-test = { path = "../../build/aws-sdk/sdk/aws-smithy-protocol aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util", "wire-mock"] } aws-smithy-runtime-api = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime-api", features = ["test-util"] } aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" } +aws-smithy-experimental = { path = "../../build/aws-sdk/sdk/aws-smithy-experimental" } aws-types = { path = "../../build/aws-sdk/sdk/aws-types" } bytes = "1" bytes-utils = "0.1.2" diff --git a/aws/sdk/integration-tests/s3/tests/hyper-10.rs b/aws/sdk/integration-tests/s3/tests/hyper-10.rs new file mode 100644 index 00000000000..094a4b56773 --- /dev/null +++ b/aws/sdk/integration-tests/s3/tests/hyper-10.rs @@ -0,0 +1,28 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_experimental::hyper_1_0::CryptoMode; +use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion; + +#[tokio::test] +#[ignore] +async fn hyper_10_end_to_end() { + let http_client = aws_smithy_experimental::hyper_1_0::HyperClientBuilder::default() + .crypto_mode(CryptoMode::AwsLcRs) + .build_https(); + let conf = aws_config::defaults(BehaviorVersion::latest()) + .http_client(http_client) + .load() + .await; + let client = aws_sdk_s3::Client::new(&conf); + let buckets = client + .list_buckets() + .send() + .await + .expect("failed to list buckets"); + for bucket in buckets.buckets() { + println!("{}", bucket.name().unwrap()); + } +} diff --git a/buildSrc/src/main/kotlin/CrateSet.kt b/buildSrc/src/main/kotlin/CrateSet.kt index bf36433e175..c134beb9b4f 100644 --- a/buildSrc/src/main/kotlin/CrateSet.kt +++ b/buildSrc/src/main/kotlin/CrateSet.kt @@ -64,6 +64,7 @@ object CrateSet { "aws-smithy-http-tower", "aws-smithy-json", "aws-smithy-mocks-experimental", + "aws-smithy-experimental", "aws-smithy-protocol-test", "aws-smithy-query", "aws-smithy-runtime", diff --git a/rust-runtime/Cargo.toml b/rust-runtime/Cargo.toml index 74d32588863..d287e7c8783 100644 --- a/rust-runtime/Cargo.toml +++ b/rust-runtime/Cargo.toml @@ -20,5 +20,6 @@ members = [ "aws-smithy-types-convert", "aws-smithy-wasm", "aws-smithy-mocks-experimental", + "aws-smithy-experimental", "aws-smithy-xml", ] diff --git a/rust-runtime/aws-smithy-experimental/Cargo.toml b/rust-runtime/aws-smithy-experimental/Cargo.toml new file mode 100644 index 00000000000..5203bcebb6b --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "aws-smithy-experimental" +version = "0.1.0" +authors = ["AWS Rust SDK Team "] +description = "Experiments for the smithy-rs ecosystem" +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/smithy-lang/smithy-rs" + +[features] +crypto-ring = ["rustls/ring"] +crypto-aws-lc = ["rustls/aws_lc_rs"] + +[dependencies] +aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-1-x"] } +aws-smithy-runtime-api = { features = ["client", "http-1x"], path = "../aws-smithy-runtime-api" } +aws-smithy-runtime = { path = "../aws-smithy-runtime"} +aws-smithy-async = { path = "../aws-smithy-async" } +hyper = { version = "1", features = ["client", "http1", "http2"] } +pin-project-lite = "0.2.13" +hyper-util = "0.1.3" +http = "1" +tokio = "1" +hyper-rustls = { version = "0.26", features = ["http2", "http1"] } +rustls = { version = "0.22.2", default-features = false } +h2 = "0.4" +once_cell = "1.18.0" +tracing = "0.1.40" +tower = "0.4.1" +futures = "0.3.29" + +[dev-dependencies] +aws-smithy-async = { path = "../aws-smithy-async", features = ["rt-tokio", "test-util"] } +aws-smithy-runtime = { path = "../aws-smithy-runtime", features = ["client", "test-util", "connector-hyper-0-14-x"]} +tokio = { version = "1", features = ["full", "test-util"]} + +[[example]] +name = "client-ring" +required-features = ["crypto-ring"] +doc-scrape-examples = true + +[[example]] +name = "client-aws-lc" +required-features = ["crypto-aws-lc"] +doc-scrape-examples = true + +[[example]] +name = "custom-dns" +required-features = ["crypto-ring"] +doc-scrape-examples = true + +[package.metadata.docs.rs] +all-features = true +targets = ["x86_64-unknown-linux-gnu"] +cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"] +rustdoc-args = ["--cfg", "docsrs"] +# End of docs.rs metadata diff --git a/rust-runtime/aws-smithy-experimental/LICENSE b/rust-runtime/aws-smithy-experimental/LICENSE new file mode 100644 index 00000000000..67db8588217 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/LICENSE @@ -0,0 +1,175 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. diff --git a/rust-runtime/aws-smithy-experimental/README.md b/rust-runtime/aws-smithy-experimental/README.md new file mode 100644 index 00000000000..1c5b8055974 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/README.md @@ -0,0 +1,14 @@ +# aws-smithy-experimental + +Staging ground for experimental new features in the smithy-rs ecosystem. + +### Hyper 1.0 Support +This crate adds support for Hyper 1.0 (see [examples](./examples)). There a few blockers before stablization: +1. Moving to `rustls` 0.23 to take advantage of FIPS support. This is blocked on `hyper-rustls` being upgraded. +2. Expose an API for providing a custom connector. Currently that API is not exposed because a shim layer is needed to avoid taking a hard dependency on hyper-util. +3. Add support for poisoning connections in the connection pool. This API needs to be either backported into hyper-util or we need to establish our own client. + + + +This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/smithy-lang/smithy-rs) code generator. + diff --git a/rust-runtime/aws-smithy-experimental/examples/client-aws-lc.rs b/rust-runtime/aws-smithy-experimental/examples/client-aws-lc.rs new file mode 100644 index 00000000000..7897f6bacfe --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/examples/client-aws-lc.rs @@ -0,0 +1,12 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder}; + +fn main() { + let _client = HyperClientBuilder::new() + .crypto_mode(CryptoMode::AwsLc) + .build_https(); +} diff --git a/rust-runtime/aws-smithy-experimental/examples/client-ring.rs b/rust-runtime/aws-smithy-experimental/examples/client-ring.rs new file mode 100644 index 00000000000..f7919ceaff1 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/examples/client-ring.rs @@ -0,0 +1,12 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder}; + +fn main() { + let _client = HyperClientBuilder::new() + .crypto_mode(CryptoMode::Ring) + .build_https(); +} diff --git a/rust-runtime/aws-smithy-experimental/examples/custom-dns.rs b/rust-runtime/aws-smithy-experimental/examples/custom-dns.rs new file mode 100644 index 00000000000..45768e5fcc4 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/examples/custom-dns.rs @@ -0,0 +1,23 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder}; +use aws_smithy_runtime_api::client::dns::{DnsFuture, ResolveDns}; +use std::net::{IpAddr, Ipv4Addr}; + +#[derive(Debug, Clone)] +struct StaticResolver; + +impl ResolveDns for StaticResolver { + fn resolve_dns<'a>(&'a self, _name: &'a str) -> DnsFuture<'a> { + DnsFuture::ready(Ok(vec![IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))])) + } +} + +fn main() { + let _client = HyperClientBuilder::new() + .crypto_mode(CryptoMode::Ring) + .build_with_resolver(StaticResolver); +} diff --git a/rust-runtime/aws-smithy-experimental/external-types.toml b/rust-runtime/aws-smithy-experimental/external-types.toml new file mode 100644 index 00000000000..42f43601bae --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/external-types.toml @@ -0,0 +1,4 @@ +allowed_external_types = [ + "aws_smithy_runtime_api::*", + "aws_smithy_async::*" +] diff --git a/rust-runtime/aws-smithy-experimental/src/hyper_1_0.rs b/rust-runtime/aws-smithy-experimental/src/hyper_1_0.rs new file mode 100644 index 00000000000..31b45374a36 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/src/hyper_1_0.rs @@ -0,0 +1,1223 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Debug; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::RwLock; +use std::task::{Context, Poll}; +use std::time::Duration; +use std::{fmt, vec}; + +use client::connect::Connection; +use h2::Reason; +use http::Uri; +use hyper::rt::{Read, Write}; +use hyper_util::client::legacy as client; +use hyper_util::client::legacy::connect::dns::Name; +use hyper_util::client::legacy::connect::Connect; +use hyper_util::rt::TokioExecutor; +use rustls::crypto::CryptoProvider; + +use aws_smithy_async::future::timeout::TimedOutError; +use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep}; +use aws_smithy_runtime_api::box_error::BoxError; +use aws_smithy_runtime_api::client::dns::ResolveDns; +use aws_smithy_runtime_api::client::http::{ + HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient, + SharedHttpConnector, +}; +use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse}; +use aws_smithy_runtime_api::client::result::ConnectorError; +use aws_smithy_runtime_api::client::runtime_components::{ + RuntimeComponents, RuntimeComponentsBuilder, +}; +use aws_smithy_runtime_api::shared::IntoShared; +use aws_smithy_types::body::SdkBody; +use aws_smithy_types::config_bag::ConfigBag; +use aws_smithy_types::error::display::DisplayErrorContext; +use aws_smithy_types::retry::ErrorKind; + +use crate::hyper_1_0::timeout_middleware::{ConnectTimeout, HttpTimeoutError}; +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub enum CryptoMode { + #[cfg(feature = "crypto-ring")] + Ring, + #[cfg(feature = "crypto-aws-lc")] + AwsLc, +} + +impl CryptoMode { + fn provider(self) -> CryptoProvider { + match self { + #[cfg(feature = "crypto-aws-lc")] + CryptoMode::AwsLc => rustls::crypto::aws_lc_rs::default_provider(), + + #[cfg(feature = "crypto-ring")] + CryptoMode::Ring => rustls::crypto::ring::default_provider(), + } + } +} + +/// A bridge that allows our `ResolveDns` trait to work with Hyper's `Resolver` interface (based on tower) +#[derive(Clone)] +struct HyperUtilResolver { + resolver: R, +} + +impl tower::Service for HyperUtilResolver { + type Response = vec::IntoIter; + type Error = Box; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Name) -> Self::Future { + let resolver = self.resolver.clone(); + Box::pin(async move { + let dns_entries = resolver.resolve_dns(req.as_str()).await?; + Ok(dns_entries + .into_iter() + .map(|ip_addr| SocketAddr::new(ip_addr, 0)) + .collect::>() + .into_iter()) + }) + } +} + +#[allow(unused_imports)] +mod cached_connectors { + + use client::connect::HttpConnector; + use hyper_rustls::HttpsConnector; + use hyper_util::client::legacy as client; + use hyper_util::client::legacy::connect::dns::GaiResolver; + + use crate::hyper_1_0::build_connector::make_tls; + use crate::hyper_1_0::{CryptoMode, Inner}; + + #[cfg(feature = "crypto-ring")] + pub(crate) static HTTPS_NATIVE_ROOTS_RING: once_cell::sync::Lazy< + HttpsConnector, + > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::Ring.provider())); + + #[cfg(feature = "crypto-aws-lc")] + pub(crate) static HTTPS_NATIVE_ROOTS_AWS_LC: once_cell::sync::Lazy< + HttpsConnector, + > = once_cell::sync::Lazy::new(|| make_tls(GaiResolver::new(), CryptoMode::AwsLc.provider())); + + pub(super) fn cached_https(mode: Inner) -> hyper_rustls::HttpsConnector { + match mode { + #[cfg(feature = "crypto-ring")] + Inner::Standard(CryptoMode::Ring) => HTTPS_NATIVE_ROOTS_RING.clone(), + #[cfg(feature = "crypto-aws-lc")] + Inner::Standard(CryptoMode::AwsLc) => HTTPS_NATIVE_ROOTS_AWS_LC.clone(), + #[allow(unreachable_patterns)] + Inner::Standard(_) => unreachable!("unexpected mode"), + Inner::Custom(provider) => make_tls(GaiResolver::new(), provider), + } + } +} + +mod build_connector { + use std::sync::Arc; + + use client::connect::HttpConnector; + use hyper_rustls::HttpsConnector; + use hyper_util::client::legacy as client; + use rustls::crypto::CryptoProvider; + + use aws_smithy_runtime_api::client::dns::ResolveDns; + + use crate::hyper_1_0::{HyperUtilResolver, Inner}; + + fn restrict_ciphers(base: CryptoProvider) -> CryptoProvider { + let suites = &[ + rustls::CipherSuite::TLS13_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS13_AES_128_GCM_SHA256, + // TLS1.2 suites + rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + rustls::CipherSuite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256, + ]; + let supported_suites = suites + .iter() + .flat_map(|suite| { + base.cipher_suites + .iter() + .find(|s| &s.suite() == suite) + .cloned() + }) + .collect::>(); + CryptoProvider { + cipher_suites: supported_suites, + ..base + } + } + + pub(crate) fn make_tls( + resolver: R, + crypto_provider: CryptoProvider, + ) -> HttpsConnector> { + use hyper_rustls::ConfigBuilderExt; + hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config( + rustls::ClientConfig::builder_with_provider(Arc::new(restrict_ciphers(crypto_provider))) + //.with_safe_default_kx_groups() + .with_safe_default_protocol_versions() + .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.") + .with_native_roots().expect("error with TLS configuration.") + .with_no_client_auth() + ) + .https_or_http() + .enable_http1() + .enable_http2() + .wrap_connector(HttpConnector::new_with_resolver(resolver)) + } + + pub(super) fn https_with_resolver( + crypto_provider: Inner, + resolver: R, + ) -> HttpsConnector>> { + make_tls(HyperUtilResolver { resolver }, crypto_provider.provider()) + } +} + +/// [`HttpConnector`] that uses [`hyper`] to make HTTP requests. +/// +/// This connector also implements socket connect and read timeouts. +/// +/// This shouldn't be used directly in most cases. +/// See the docs on [`HyperClientBuilder`] for examples of how +/// to customize the Hyper client. +#[derive(Debug)] +pub struct HyperConnector { + adapter: Box, +} + +impl HyperConnector { + /// Builder for a Hyper connector. + pub fn builder() -> HyperConnectorBuilder { + Default::default() + } +} + +impl HttpConnector for HyperConnector { + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { + self.adapter.call(request) + } +} + +/// Builder for [`HyperConnector`]. +#[derive(Default, Debug)] +pub struct HyperConnectorBuilder { + connector_settings: Option, + sleep_impl: Option, + client_builder: Option, + #[allow(unused)] + crypto: Crypto, +} + +#[derive(Default)] +#[non_exhaustive] +pub struct CryptoUnset {} + +pub struct CryptoProviderSelected { + crypto_provider: Inner, +} + +#[derive(Clone)] +enum Inner { + Standard(CryptoMode), + #[allow(dead_code)] + Custom(CryptoProvider), +} + +impl Inner { + fn provider(&self) -> CryptoProvider { + match self { + Inner::Standard(mode) => mode.provider(), + Inner::Custom(provider) => provider.clone(), + } + } +} + +#[cfg(any(feature = "crypto-aws-lc", feature = "crypto-ring"))] +impl HyperConnectorBuilder { + pub fn build_from_resolver( + self, + resolver: R, + ) -> HyperConnector { + let connector = + build_connector::https_with_resolver(self.crypto.crypto_provider.clone(), resolver); + self.build(connector) + } +} + +impl HyperConnectorBuilder { + /// Create a [`HyperConnector`] from this builder and a given connector. + pub(crate) fn build(self, tcp_connector: C) -> HyperConnector + where + C: Send + Sync + 'static, + C: Clone, + C: tower::Service, + C::Response: Read + Write + Connection + Send + Sync + Unpin, + C: Connect, + C::Future: Unpin + Send + 'static, + C::Error: Into, + { + let client_builder = + self.client_builder + .unwrap_or(hyper_util::client::legacy::Builder::new( + TokioExecutor::new(), + )); + let sleep_impl = self.sleep_impl.or_else(default_async_sleep); + let (connect_timeout, read_timeout) = self + .connector_settings + .map(|c| (c.connect_timeout(), c.read_timeout())) + .unwrap_or((None, None)); + + let connector = match connect_timeout { + Some(duration) => timeout_middleware::ConnectTimeout::new( + tcp_connector, + sleep_impl + .clone() + .expect("a sleep impl must be provided in order to have a connect timeout"), + duration, + ), + None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector), + }; + let base = client_builder.build(connector); + let read_timeout = match read_timeout { + Some(duration) => timeout_middleware::HttpReadTimeout::new( + base, + sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"), + duration, + ), + None => timeout_middleware::HttpReadTimeout::no_timeout(base), + }; + HyperConnector { + adapter: Box::new(Adapter { + client: read_timeout, + }), + } + } + + /// Set the async sleep implementation used for timeouts + /// + /// Calling this is only necessary for testing or to use something other than + /// [`default_async_sleep`]. + pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self { + self.sleep_impl = Some(sleep_impl.into_shared()); + self + } + + /// Set the async sleep implementation used for timeouts + /// + /// Calling this is only necessary for testing or to use something other than + /// [`default_async_sleep`]. + pub fn set_sleep_impl(&mut self, sleep_impl: Option) -> &mut Self { + self.sleep_impl = sleep_impl; + self + } + + /// Configure the HTTP settings for the `HyperAdapter` + pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self { + self.connector_settings = Some(connector_settings); + self + } + + /// Configure the HTTP settings for the `HyperAdapter` + pub fn set_connector_settings( + &mut self, + connector_settings: Option, + ) -> &mut Self { + self.connector_settings = connector_settings; + self + } + + /// Override the Hyper client [`Builder`](hyper_util::client::legacy::client::Builder) used to construct this client. + /// + /// This enables changing settings like forcing HTTP2 and modifying other default client behavior. + pub(crate) fn hyper_builder( + mut self, + hyper_builder: hyper_util::client::legacy::Builder, + ) -> Self { + self.set_hyper_builder(Some(hyper_builder)); + self + } + + /// Override the Hyper client [`Builder`](hyper_util::client::legacy::client::Builder) used to construct this client. + /// + /// This enables changing settings like forcing HTTP2 and modifying other default client behavior. + pub(crate) fn set_hyper_builder( + &mut self, + hyper_builder: Option, + ) -> &mut Self { + self.client_builder = hyper_builder; + self + } +} + +/// Adapter to use a Hyper 1.0-based Client as an `HttpConnector` +/// +/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`]. +struct Adapter { + client: timeout_middleware::HttpReadTimeout< + hyper_util::client::legacy::Client, SdkBody>, + >, +} + +impl fmt::Debug for Adapter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Adapter") + .field("client", &"** hyper client **") + .finish() + } +} + +/* +/// Extract a smithy connection from a hyper CaptureConnection +fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option { + let capture_conn = capture_conn.clone(); + if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() { + let mut extensions = Extensions::new(); + conn.get_extras(&mut extensions); + let http_info = extensions.get::(); + let mut builder = ConnectionMetadata::builder() + .proxied(conn.is_proxied()) + .poison_fn(move || match capture_conn.connection_metadata().as_ref() { + Some(conn) => conn.poison(), + None => tracing::trace!("no connection existed to poison"), + }); + + builder + .set_local_addr(http_info.map(|info| info.local_addr())) + .set_remote_addr(http_info.map(|info| info.remote_addr())); + + let smithy_connection = builder.build(); + + Some(smithy_connection) + } else { + None + } +}*/ + +impl HttpConnector for Adapter +where + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Unpin + 'static, + ConnectTimeout: Connect, + C::Future: Unpin + Send + 'static, + C::Error: Into, +{ + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { + let request = match request.try_into_http1x() { + Ok(request) => request, + Err(err) => { + return HttpConnectorFuture::ready(Err(ConnectorError::user(err.into()))); + } + }; + /*let capture_connection = capture_connection(&mut request); + if let Some(capture_smithy_connection) = + request.extensions().get::() + { + capture_smithy_connection + .set_connection_retriever(move || extract_smithy_connection(&capture_connection)); + }*/ + let mut client = self.client.clone(); + use tower::Service; + let fut = client.call(request); + HttpConnectorFuture::new(async move { + let response = fut + .await + .map_err(downcast_error)? + .map(SdkBody::from_body_1_x); + match HttpResponse::try_from(response) { + Ok(response) => Ok(response), + Err(err) => Err(ConnectorError::other(err.into(), None)), + } + }) + } +} + +/// Downcast errors coming out of hyper into an appropriate `ConnectorError` +fn downcast_error(err: BoxError) -> ConnectorError { + // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout + if find_source::(err.as_ref()).is_some() { + return ConnectorError::timeout(err); + } + // is the top of chain error actually already a `ConnectorError`? return that directly + let err = match err.downcast::() { + Ok(connector_error) => return *connector_error, + Err(box_error) => box_error, + }; + // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific + // error classifications + let err = match find_source::(err.as_ref()) { + Some(hyper_error) => return to_connector_error(hyper_error)(err), + None => err, + }; + + // otherwise, we have no idea! + ConnectorError::other(err, None) +} + +/// Convert a [`hyper::Error`] into a [`ConnectorError`] +fn to_connector_error(err: &hyper::Error) -> fn(BoxError) -> ConnectorError { + if err.is_timeout() || find_source::(err).is_some() { + return ConnectorError::timeout; + } + if err.is_user() { + return ConnectorError::user; + } + if err.is_closed() || err.is_canceled() || find_source::(err).is_some() { + return ConnectorError::io; + } + // We sometimes receive this from S3: hyper::Error(IncompleteMessage) + if err.is_incomplete_message() { + return |err: BoxError| ConnectorError::other(err.into(), Some(ErrorKind::TransientError)); + } + + if let Some(h2_err) = find_source::(err) { + if h2_err.is_go_away() + || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM)) + { + return ConnectorError::io; + } + } + + tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue."); + |err: BoxError| ConnectorError::other(err.into(), None) +} + +fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> { + let mut next = Some(err); + while let Some(err) = next { + if let Some(matching_err) = err.downcast_ref::() { + return Some(matching_err); + } + next = err.source(); + } + None +} + +// TODO(https://github.com/awslabs/aws-sdk-rust/issues/1090): CacheKey must also include ptr equality to RuntimeComponents +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +struct CacheKey { + connect_timeout: Option, + read_timeout: Option, +} + +impl From<&HttpConnectorSettings> for CacheKey { + fn from(value: &HttpConnectorSettings) -> Self { + Self { + connect_timeout: value.connect_timeout(), + read_timeout: value.read_timeout(), + } + } +} + +struct HyperClient { + connector_cache: RwLock>, + client_builder: hyper_util::client::legacy::Builder, + tcp_connector_fn: F, +} + +impl fmt::Debug for HyperClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HyperClient") + .field("connector_cache", &self.connector_cache) + .field("client_builder", &self.client_builder) + .finish() + } +} + +impl HttpClient for HyperClient +where + F: Fn() -> C + Send + Sync, + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static, + C::Future: Unpin + Send + 'static, + C::Error: Into, +{ + fn http_connector( + &self, + settings: &HttpConnectorSettings, + components: &RuntimeComponents, + ) -> SharedHttpConnector { + let key = CacheKey::from(settings); + let mut connector = self.connector_cache.read().unwrap().get(&key).cloned(); + if connector.is_none() { + let mut cache = self.connector_cache.write().unwrap(); + // Short-circuit if another thread already wrote a connector to the cache for this key + if !cache.contains_key(&key) { + let mut builder = HyperConnector::builder() + .hyper_builder(self.client_builder.clone()) + .connector_settings(settings.clone()); + builder.set_sleep_impl(components.sleep_impl()); + + let start = components.time_source().map(|ts| ts.now()); + let tcp_connector = (self.tcp_connector_fn)(); + let end = components.time_source().map(|ts| ts.now()); + if let (Some(start), Some(end)) = (start, end) { + if let Ok(elapsed) = end.duration_since(start) { + tracing::debug!("new TCP connector created in {:?}", elapsed); + } + } + let connector = SharedHttpConnector::new(builder.build(tcp_connector)); + cache.insert(key.clone(), connector); + } + connector = cache.get(&key).cloned(); + } + + connector.expect("cache populated above") + } + + fn validate_base_client_config( + &self, + _: &RuntimeComponentsBuilder, + _: &ConfigBag, + ) -> Result<(), BoxError> { + // Initialize the TCP connector at this point so that native certs load + // at client initialization time instead of upon first request. We do it + // here rather than at construction so that it won't run if this is not + // the selected HTTP client for the base config (for example, if this was + // the default HTTP client, and it was overridden by a later plugin). + let _ = (self.tcp_connector_fn)(); + Ok(()) + } +} + +/// Builder for a hyper-backed [`HttpClient`] implementation. +/// +/// This builder can be used to customize the underlying TCP connector used, as well as +/// hyper client configuration. +/// +/// # Examples +/// +/// Construct a Hyper client with the RusTLS TLS implementation. +/// This can be useful when you want to share a Hyper connector between multiple +/// generated Smithy clients. +#[derive(Clone, Default, Debug)] +pub struct HyperClientBuilder { + client_builder: Option, + crypto_provider: Crypto, +} + +impl HyperClientBuilder { + /// Create a hyper client using RusTLS for TLS + /// + /// The trusted certificates will be loaded later when this becomes the selected + /// HTTP client for a Smithy client. + pub fn build_https(self) -> SharedHttpClient { + let crypto = self.crypto_provider.crypto_provider; + build_with_fn(self.client_builder, move || { + cached_connectors::cached_https(crypto.clone()) + }) + } + + /// Create a hyper client using a custom DNS resolver + pub fn build_with_resolver( + self, + resolver: impl ResolveDns + Clone + 'static, + ) -> SharedHttpClient { + build_with_fn(self.client_builder, move || { + build_connector::https_with_resolver( + self.crypto_provider.crypto_provider.clone(), + resolver.clone(), + ) + }) + } +} + +impl HyperClientBuilder { + /// Creates a new builder. + pub fn new() -> Self { + Self::default() + } + + pub fn crypto_mode(self, provider: CryptoMode) -> HyperClientBuilder { + HyperClientBuilder { + client_builder: self.client_builder, + crypto_provider: CryptoProviderSelected { + crypto_provider: Inner::Standard(provider), + }, + } + } + + /// This interface will be broken in the future + /// + /// This exposes `CryptoProvider` from `rustls` directly and this API has no stability guarantee. + #[cfg(crypto_unstable)] + pub fn crypto_provider_unstable( + self, + provider: CryptoProvider, + ) -> HyperClientBuilder { + HyperClientBuilder { + client_builder: self.client_builder, + crypto_provider: CryptoProviderSelected { + crypto_provider: Inner::Custom(provider), + }, + } + } +} + +fn build_with_fn( + client_builder: Option, + tcp_connector_fn: F, +) -> SharedHttpClient +where + F: Fn() -> C + Send + Sync + 'static, + C: Clone + Send + Sync + 'static, + C: tower::Service, + C::Response: Connection + Read + Write + Send + Sync + Unpin + 'static, + C::Future: Unpin + Send + 'static, + C::Error: Into, + C: Connect, +{ + SharedHttpClient::new(HyperClient { + connector_cache: RwLock::new(HashMap::new()), + client_builder: client_builder + .unwrap_or_else(|| hyper_util::client::legacy::Builder::new(TokioExecutor::new())), + tcp_connector_fn, + }) +} + +mod timeout_middleware { + use std::error::Error; + use std::fmt::Formatter; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::time::Duration; + + use http::Uri; + use pin_project_lite::pin_project; + + use aws_smithy_async::future::timeout::{TimedOutError, Timeout}; + use aws_smithy_async::rt::sleep::Sleep; + use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep}; + use aws_smithy_runtime_api::box_error::BoxError; + + #[derive(Debug)] + pub(crate) struct HttpTimeoutError { + kind: &'static str, + duration: Duration, + } + + impl std::fmt::Display for HttpTimeoutError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} timeout occurred after {:?}", + self.kind, self.duration + ) + } + } + + impl Error for HttpTimeoutError { + // We implement the `source` function as returning a `TimedOutError` because when `downcast_error` + // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`) + // this method will be checked to determine if it's a timeout-related error. + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&TimedOutError) + } + } + + /// Timeout wrapper that will timeout on the initial TCP connection + /// + /// # Stability + /// This interface is unstable. + #[derive(Clone, Debug)] + pub(super) struct ConnectTimeout { + inner: I, + timeout: Option<(SharedAsyncSleep, Duration)>, + } + + impl ConnectTimeout { + /// Create a new `ConnectTimeout` around `inner`. + /// + /// Typically, `I` will implement [`hyper_0_14::client::connect::Connect`]. + pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self { + Self { + inner, + timeout: Some((sleep, timeout)), + } + } + + pub(crate) fn no_timeout(inner: I) -> Self { + Self { + inner, + timeout: None, + } + } + } + + #[derive(Clone, Debug)] + pub(crate) struct HttpReadTimeout { + inner: I, + timeout: Option<(SharedAsyncSleep, Duration)>, + } + + impl HttpReadTimeout { + /// Create a new `HttpReadTimeout` around `inner`. + /// + /// Typically, `I` will implement [`hyper_0_14::service::Service>`]. + pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self { + Self { + inner, + timeout: Some((sleep, timeout)), + } + } + + pub(crate) fn no_timeout(inner: I) -> Self { + Self { + inner, + timeout: None, + } + } + } + + pin_project! { + /// Timeout future for Tower services + /// + /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out + /// without incurring an additional allocation for each timeout layer. + #[project = MaybeTimeoutFutureProj] + pub enum MaybeTimeoutFuture { + Timeout { + #[pin] + timeout: Timeout, + error_type: &'static str, + duration: Duration, + }, + NoTimeout { + #[pin] + future: F + } + } + } + + impl Future for MaybeTimeoutFuture + where + F: Future>, + E: Into, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (timeout_future, kind, &mut duration) = match self.project() { + MaybeTimeoutFutureProj::NoTimeout { future } => { + return future.poll(cx).map_err(|err| err.into()); + } + MaybeTimeoutFutureProj::Timeout { + timeout, + error_type, + duration, + } => (timeout, error_type, duration), + }; + match timeout_future.poll(cx) { + Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())), + Poll::Ready(Err(_timeout)) => { + Poll::Ready(Err(HttpTimeoutError { kind, duration }.into())) + } + Poll::Pending => Poll::Pending, + } + } + } + + impl tower::Service for ConnectTimeout + where + I: tower::Service, + I::Error: Into, + { + type Response = I::Response; + type Error = BoxError; + type Future = MaybeTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, req: Uri) -> Self::Future { + match &self.timeout { + Some((sleep, duration)) => { + let sleep = sleep.sleep(*duration); + MaybeTimeoutFuture::Timeout { + timeout: Timeout::new(self.inner.call(req), sleep), + error_type: "HTTP connect", + duration: *duration, + } + } + None => MaybeTimeoutFuture::NoTimeout { + future: self.inner.call(req), + }, + } + } + } + + impl tower::Service> for HttpReadTimeout + where + I: tower::Service>, + I::Error: Send + Sync + Error + 'static, + { + type Response = I::Response; + type Error = BoxError; + type Future = MaybeTimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(|err| err.into()) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + match &self.timeout { + Some((sleep, duration)) => { + let sleep = sleep.sleep(*duration); + MaybeTimeoutFuture::Timeout { + timeout: Timeout::new(self.inner.call(req), sleep), + error_type: "HTTP read", + duration: *duration, + } + } + None => MaybeTimeoutFuture::NoTimeout { + future: self.inner.call(req), + }, + } + } + } + + #[cfg(test)] + pub(crate) mod test { + use std::time::Duration; + + use hyper::rt::ReadBufCursor; + use hyper_util::client::legacy::connect::Connected; + use hyper_util::rt::TokioIo; + use tokio::net::TcpStream; + + use aws_smithy_async::assert_elapsed; + use aws_smithy_async::future::never::Never; + use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep}; + use aws_smithy_types::error::display::DisplayErrorContext; + + use super::super::*; + use super::*; + + #[allow(unused)] + fn connect_timeout_is_correct() { + is_send_sync::>(); + } + + #[allow(unused)] + fn is_send_sync() {} + + /// A service that will never return whatever it is you want + /// + /// Returned futures will return Pending forever + #[non_exhaustive] + #[derive(Clone, Default, Debug)] + pub(crate) struct NeverConnects; + impl tower::Service for NeverConnects { + type Response = TokioIo; + type Error = ConnectorError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _uri: Uri) -> Self::Future { + Box::pin(async move { + Never::new().await; + unreachable!() + }) + } + } + + /// A service that will connect but never send any data + #[derive(Clone, Debug, Default)] + struct NeverReplies; + impl tower::Service for NeverReplies { + type Response = EmptyStream; + type Error = BoxError; + type Future = std::future::Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Uri) -> Self::Future { + std::future::ready(Ok(EmptyStream)) + } + } + + /// A stream that will never return or accept any data + #[non_exhaustive] + #[derive(Debug, Default)] + struct EmptyStream; + impl Read for EmptyStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: ReadBufCursor<'_>, + ) -> Poll> { + Poll::Pending + } + } + impl Write for EmptyStream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + impl Connection for EmptyStream { + fn connected(&self) -> Connected { + Connected::new() + } + } + + #[tokio::test] + async fn http_connect_timeout_works() { + let tcp_connector = NeverConnects::default(); + let connector_settings = HttpConnectorSettings::builder() + .connect_timeout(Duration::from_secs(1)) + .build(); + let hyper = HyperConnector::builder() + .connector_settings(connector_settings) + .sleep_impl(SharedAsyncSleep::new(TokioSleep::new())) + .build(tcp_connector) + .adapter; + let now = tokio::time::Instant::now(); + tokio::time::pause(); + let resp = hyper + .call(HttpRequest::get("https://static-uri.com").unwrap()) + .await + .unwrap_err(); + assert!( + resp.is_timeout(), + "expected resp.is_timeout() to be true but it was false, resp == {:?}", + resp + ); + let message = DisplayErrorContext(&resp).to_string(); + let expected = + "timeout: client error (Connect): HTTP connect timeout occurred after 1s"; + assert!( + message.contains(expected), + "expected '{message}' to contain '{expected}'" + ); + assert_elapsed!(now, Duration::from_secs(1)); + } + + #[tokio::test] + async fn http_read_timeout_works() { + let tcp_connector = NeverReplies; + let connector_settings = HttpConnectorSettings::builder() + .connect_timeout(Duration::from_secs(1)) + .read_timeout(Duration::from_secs(2)) + .build(); + let hyper = HyperConnector::builder() + .connector_settings(connector_settings) + .sleep_impl(SharedAsyncSleep::new(TokioSleep::new())) + .build(tcp_connector) + .adapter; + let now = tokio::time::Instant::now(); + tokio::time::pause(); + let err = hyper + .call(HttpRequest::get("https://fake-uri.com").unwrap()) + .await + .unwrap_err(); + assert!( + err.is_timeout(), + "expected err.is_timeout() to be true but it was false, err == {err:?}", + ); + let message = format!("{}", DisplayErrorContext(&err)); + let expected = "timeout: HTTP read timeout occurred after 2s"; + assert!( + message.contains(expected), + "expected '{message}' to contain '{expected}'" + ); + assert_elapsed!(now, Duration::from_secs(2)); + } + } +} + +#[cfg(test)] +mod test { + use std::io::{Error, ErrorKind}; + use std::pin::Pin; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + use std::task::{Context, Poll}; + + use http::Uri; + use hyper::rt::ReadBufCursor; + use hyper_util::client::legacy::connect::Connected; + + use aws_smithy_async::time::SystemTimeSource; + use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder; + + use crate::hyper_1_0::timeout_middleware::test::NeverConnects; + + use super::*; + + #[tokio::test] + async fn connector_selection() { + // Create a client that increments a count every time it creates a new HyperConnector + let creation_count = Arc::new(AtomicU32::new(0)); + let http_client = build_with_fn(None, { + let count = creation_count.clone(); + move || { + count.fetch_add(1, Ordering::Relaxed); + NeverConnects + } + }); + + // This configuration should result in 4 separate connectors with different timeout settings + let settings = [ + HttpConnectorSettings::builder() + .connect_timeout(Duration::from_secs(3)) + .build(), + HttpConnectorSettings::builder() + .read_timeout(Duration::from_secs(3)) + .build(), + HttpConnectorSettings::builder() + .connect_timeout(Duration::from_secs(3)) + .read_timeout(Duration::from_secs(3)) + .build(), + HttpConnectorSettings::builder() + .connect_timeout(Duration::from_secs(5)) + .read_timeout(Duration::from_secs(3)) + .build(), + ]; + + // Kick off thousands of parallel tasks that will try to create a connector + let components = RuntimeComponentsBuilder::for_tests() + .with_time_source(Some(SystemTimeSource::new())) + .build() + .unwrap(); + let mut handles = Vec::new(); + for setting in &settings { + for _ in 0..1000 { + let client = http_client.clone(); + handles.push(tokio::spawn({ + let setting = setting.clone(); + let components = components.clone(); + async move { + let _ = client.http_connector(&setting, &components); + } + })); + } + } + for handle in handles { + handle.await.unwrap(); + } + + // Verify only 4 connectors were created amidst the chaos + assert_eq!(4, creation_count.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn hyper_io_error() { + let connector = TestConnection { + inner: HangupStream, + }; + let adapter = HyperConnector::builder().build(connector).adapter; + let err = adapter + .call(HttpRequest::get("https://socket-hangup.com").unwrap()) + .await + .expect_err("socket hangup"); + assert!(err.is_io(), "unexpected error type: {:?}", err); + } + + // ---- machinery to make a Hyper connector that responds with an IO Error + #[derive(Clone)] + struct HangupStream; + + impl Connection for HangupStream { + fn connected(&self) -> Connected { + Connected::new() + } + } + + impl Read for HangupStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: ReadBufCursor<'_>, + ) -> Poll> { + Poll::Ready(Err(Error::new( + ErrorKind::ConnectionReset, + "connection reset", + ))) + } + } + + impl Write for HangupStream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + } + + #[derive(Clone)] + struct TestConnection { + inner: T, + } + + impl tower::Service for TestConnection + where + T: Clone + Connection, + { + type Response = T; + type Error = BoxError; + type Future = std::future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Uri) -> Self::Future { + std::future::ready(Ok(self.inner.clone())) + } + } +} diff --git a/rust-runtime/aws-smithy-experimental/src/lib.rs b/rust-runtime/aws-smithy-experimental/src/lib.rs new file mode 100644 index 00000000000..fc0bf781e31 --- /dev/null +++ b/rust-runtime/aws-smithy-experimental/src/lib.rs @@ -0,0 +1,10 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/* Automatically managed default lints */ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +/* End of automatically managed default lints */ + +pub mod hyper_1_0; diff --git a/tools/ci-build/sdk-lints/src/readmes.rs b/tools/ci-build/sdk-lints/src/readmes.rs index 32d235ffa35..76de79c6e6a 100644 --- a/tools/ci-build/sdk-lints/src/readmes.rs +++ b/tools/ci-build/sdk-lints/src/readmes.rs @@ -12,6 +12,7 @@ const CRATES_TO_BE_USED_DIRECTLY: &[&str] = [ "aws-config", "aws-smithy-types-convert", "aws-smithy-mocks-experimental", + "aws-smithy-experimental", ] .as_slice();