Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Timestream in the orchestrator #2846

Merged
merged 8 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ author = "rcoh"
message = "Bump dependency on `lambda_http` by `aws-smithy-http-server` to 0.8.0. This version of `aws-smithy-http-server` is only guaranteed to be compatible with 0.8.0, or semver-compatible versions of 0.8.0 of the `lambda_http` crate. It will not work with versions prior to 0.8.0 _at runtime_, making requests to your smithy-rs service unroutable, so please make sure you're running your service in a compatible configuration"
author = "david-perez"
references = ["smithy-rs#2676", "smithy-rs#2685"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = """Remove `PollError` from an operations `Service::Error`.
Expand All @@ -141,9 +141,9 @@ meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "server" }
author = "hlbarber"

[[aws-sdk-rust]]
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.enable_endpoint_discovery()` on the `Client` after construction."
message = "The SDK has added support for timestreamwrite and timestreamquery. Support for these services is considered experimental at this time. In order to use these services, you MUST call `.with_endpoint_discovery_enabled()` on the `Client` after construction."
meta = { "breaking" = false, "tada" = true, "bug" = false }
references = ["smithy-rs#2707", "aws-sdk-rust#114"]
references = ["smithy-rs#2707", "aws-sdk-rust#114", "smithy-rs#2846"]
author = "rcoh"

[[smithy-rs]]
Expand Down Expand Up @@ -197,7 +197,7 @@ filter_by_operation_id(plugin, |id| id.absolute() != "namespace#name");
"""
author = "82marbag"
references = ["smithy-rs#2678"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }

[[smithy-rs]]
message = "The occurrences of `Arc<dyn ResolveEndpoint>` have now been replaced with `SharedEndpointResolver` in public APIs."
Expand Down Expand Up @@ -532,7 +532,7 @@ let scoped_plugin = Scoped::new::<SomeScope>(plugin);

"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779", "smithy-rs#2827"]
meta = { "breaking" = true, "tada" = true, "bug" = false }
meta = { "breaking" = true, "tada" = true, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
Expand Down Expand Up @@ -608,7 +608,7 @@ let plugin = plugin_from_operation_fn(map);
```
"""
references = ["smithy-rs#2740", "smithy-rs#2759", "smithy-rs#2779"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
meta = { "breaking" = true, "tada" = false, "bug" = false, target = "server" }
author = "hlbarber"

[[smithy-rs]]
Expand Down
3 changes: 3 additions & 0 deletions aws/rust-runtime/aws-inlineable/src/endpoint_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl ReloadEndpoint {
pub async fn reload_once(&self) {
match (self.loader)().await {
Ok((endpoint, expiry)) => {
tracing::debug!("caching resolved endpoint: {:?}", (&endpoint, &expiry));
*self.endpoint.lock().unwrap() = Some(ExpiringEndpoint { endpoint, expiry })
}
Err(err) => *self.error.lock().unwrap() = Some(err),
Expand Down Expand Up @@ -128,6 +129,7 @@ where
sleep,
time,
};
tracing::debug!("populating initial endpoint discovery cache");
reloader.reload_once().await;
// if we didn't successfully get an endpoint, bail out so the client knows
// configuration failed to work
Expand All @@ -137,6 +139,7 @@ where

impl EndpointCache {
fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result {
tracing::trace!("resolving endpoint from endpoint discovery cache");
self.endpoint
.lock()
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package software.amazon.smithy.rustsdk.customize.timestream

import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.client.smithy.endpoint.Types
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
Expand All @@ -14,7 +15,6 @@ import software.amazon.smithy.rust.codegen.core.rustlang.Visibility
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.toType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.smithy.customize.AdHocCustomization
import software.amazon.smithy.rust.codegen.core.smithy.customize.adhocCustomization
Expand All @@ -25,7 +25,7 @@ import software.amazon.smithy.rustsdk.InlineAwsDependency
/**
* This decorator does two things:
* 1. Adds the `endpoint_discovery` inlineable
* 2. Adds a `enable_endpoint_discovery` method on client that returns a wrapped client with endpoint discovery enabled
* 2. Adds a `with_endpoint_discovery_enabled` method on client that returns a wrapped client with endpoint discovery enabled
*/
class TimestreamDecorator : ClientCodegenDecorator {
override val name: String = "Timestream"
Expand All @@ -38,8 +38,8 @@ class TimestreamDecorator : ClientCodegenDecorator {
rustTemplate(
"""
let config = aws_config::load_from_env().await;
// You MUST call `enable_endpoint_discovery` to produce a working client for this service.
let ${it.clientName} = ${it.crateName}::Client::new(&config).enable_endpoint_discovery().await;
// You MUST call `with_endpoint_discovery_enabled` to produce a working client for this service.
let ${it.clientName} = ${it.crateName}::Client::new(&config).with_endpoint_discovery_enabled().await;
""".replaceIndent(it.indent),
)
},
Expand All @@ -52,7 +52,7 @@ class TimestreamDecorator : ClientCodegenDecorator {
Visibility.PUBLIC,
CargoDependency.Tokio.copy(scope = DependencyScope.Compile, features = setOf("sync")),
)
rustCrate.lib {
rustCrate.withModule(ClientRustModule.client) {
// helper function to resolve an endpoint given a base client
rustTemplate(
"""
Expand All @@ -76,33 +76,40 @@ class TimestreamDecorator : ClientCodegenDecorator {
/// Enable endpoint discovery for this client
///
/// This method MUST be called to construct a working client.
pub async fn enable_endpoint_discovery(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
let mut new_conf = self.conf().clone();
let sleep = self.conf().sleep_impl().expect("sleep impl must be provided");
let time = self.conf().time_source().expect("time source must be provided");
pub async fn with_endpoint_discovery_enabled(self) -> #{Result}<(Self, #{endpoint_discovery}::ReloadEndpoint), #{ResolveEndpointError}> {
let handle = self.handle.clone();

// The original client without endpoint discover gets moved into the endpoint discovery
// resolver since calls to DescribeEndpoint without discovery need to be made.
let client_without_discovery = self;
let (resolver, reloader) = #{endpoint_discovery}::create_cache(
move || {
let client = self.clone();
let client = client_without_discovery.clone();
async move { resolve_endpoint(&client).await }
},
sleep,
time
)
.await?;
new_conf.endpoint_resolver = #{SharedEndpointResolver}::new(resolver);
Ok((Self::from_conf(new_conf), reloader))
handle.conf.sleep_impl()
.expect("endpoint discovery requires the client config to have a sleep impl"),
handle.conf.time_source()
.expect("endpoint discovery requires the client config to have a time source"),
).await?;

let client_with_discovery = crate::Client::from_conf(
handle.conf.to_builder()
.endpoint_resolver(#{SharedEndpointResolver}::new(resolver))
.build()
);
Ok((client_with_discovery, reloader))
}
}
""",
"endpoint_discovery" to endpointDiscovery.toType(),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
*RuntimeType.preludeScope,
"Arc" to RuntimeType.Arc,
"Duration" to RuntimeType.std.resolve("time::Duration"),
"SharedEndpointResolver" to RuntimeType.smithyHttp(codegenContext.runtimeConfig)
.resolve("endpoint::SharedEndpointResolver"),
"SystemTimeSource" to RuntimeType.smithyAsync(codegenContext.runtimeConfig)
.resolve("time::SystemTimeSource"),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
"endpoint_discovery" to endpointDiscovery.toType(),
*Types(codegenContext.runtimeConfig).toArray(),
*preludeScope,
)
}
}
Expand Down
17 changes: 17 additions & 0 deletions aws/sdk/integration-tests/s3/tests/config_to_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_sdk_s3::config::AppName;

#[tokio::test]
async fn test_config_to_builder() {
let config = aws_config::load_from_env().await;
let config = aws_sdk_s3::Config::new(&config);
// should not panic
let _ = config
.to_builder()
.app_name(AppName::new("SomeAppName").unwrap())
.build();
}
5 changes: 3 additions & 2 deletions aws/sdk/integration-tests/timestreamquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ publish = false
[dev-dependencies]
aws-credential-types = { path = "../../build/aws-sdk/sdk/aws-credential-types", features = ["test-util"] }
aws-sdk-timestreamquery = { path = "../../build/aws-sdk/sdk/timestreamquery" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-async = { path = "../../build/aws-sdk/sdk/aws-smithy-async", features = ["test-util"] }
aws-smithy-client = { path = "../../build/aws-sdk/sdk/aws-smithy-client", features = ["test-util"] }
aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["test-util"] }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
tokio = { version = "1.23.1", features = ["full", "test-util"] }
tracing-subscriber = "0.3.17"
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::time::{Duration, UNIX_EPOCH};

#[tokio::test]
async fn do_endpoint_discovery() {
tracing_subscriber::fmt::init();
let _logs = aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs();

let conn = ReplayingConnection::from_file("tests/traffic.json").unwrap();
//let conn = aws_smithy_client::dvr::RecordingConnection::new(conn);
let start = UNIX_EPOCH + Duration::from_secs(1234567890);
Expand All @@ -32,7 +33,7 @@ async fn do_endpoint_discovery() {
.idempotency_token_provider("0000-0000-0000")
.build();
let (client, reloader) = query::Client::from_conf(conf)
.enable_endpoint_discovery()
.with_endpoint_discovery_enabled()
.await
.expect("initial setup of endpoint discovery failed");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ class ConfigOverrideRuntimePluginGenerator(
initial_config: #{FrozenLayer},
initial_components: &#{RuntimeComponentsBuilder}
) -> Self {
let mut layer = #{Layer}::from(config_override.config)
.with_name("$moduleUseName::config::ConfigOverrideRuntimePlugin");
let mut layer = config_override.config;
let mut components = config_override.runtime_components;
let mut resolver = #{Resolver}::overrid(initial_config, initial_components, &mut layer, &mut components);

#{config}

let _ = resolver;
Self {
config: layer.freeze(),
config: #{Layer}::from(layer)
.with_name("$moduleUseName::config::ConfigOverrideRuntimePlugin").freeze(),
components,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,11 @@ class ServiceConfigGenerator(
if (runtimeMode.defaultToOrchestrator) {
rustTemplate(
"""
// Both `config` and `cloneable` are the same config, but the cloneable one
// is kept around so that it is possible to convert back into a builder. This can be
// optimized in the future.
pub(crate) config: #{FrozenLayer},
cloneable: #{CloneableLayer},
pub(crate) runtime_components: #{RuntimeComponentsBuilder},
pub(crate) runtime_plugins: #{Vec}<#{SharedRuntimePlugin}>,
""",
Expand Down Expand Up @@ -369,6 +373,20 @@ class ServiceConfigGenerator(
pub fn builder() -> Builder { Builder::default() }
""",
)
if (runtimeMode.defaultToOrchestrator) {
writer.rustTemplate(
"""
/// Converts this config back into a builder so that it can be tweaked.
pub fn to_builder(&self) -> Builder {
Builder {
config: self.cloneable.clone(),
runtime_components: self.runtime_components.clone(),
runtime_plugins: self.runtime_plugins.clone(),
}
}
""",
)
}
customizations.forEach {
it.section(ServiceConfig.ConfigImpl)(this)
}
Expand Down Expand Up @@ -478,12 +496,7 @@ class ServiceConfigGenerator(
rustBlock("pub fn build(mut self) -> Config") {
rustTemplate(
"""
// The builder is being turned into a service config. While doing so, we'd like to avoid
// requiring that items created and stored _during_ the build method be `Clone`, since they
// will soon be part of a `FrozenLayer` owned by the service config. So we will convert the
// current `CloneableLayer` into a `Layer` that does not impose the `Clone` requirement.
let mut layer = #{Layer}::from(self.config).with_name("$moduleUseName::config::Config");
##[allow(unused)]
let mut layer = self.config;
let mut resolver = #{Resolver}::initial(&mut layer, &mut self.runtime_components);
""",
*codegenScope,
Expand All @@ -495,12 +508,14 @@ class ServiceConfigGenerator(
customizations.forEach {
it.section(ServiceConfig.BuilderBuildExtras)(this)
}
rust(
rustTemplate(
"""
config: layer.freeze(),
config: #{Layer}::from(layer.clone()).with_name("$moduleUseName::config::Config").freeze(),
cloneable: layer,
runtime_components: self.runtime_components,
runtime_plugins: self.runtime_plugins,
""",
*codegenScope,
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-client/src/dvr/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ReplayingConnection {
))?
.take()
.await;
aws_smithy_protocol_test::assert_uris_match(actual.uri(), expected.uri());
aws_smithy_protocol_test::assert_uris_match(expected.uri(), actual.uri());
body_comparer(expected.body().as_ref(), actual.body().as_ref())?;
let expected_headers = expected
.headers()
Expand Down
21 changes: 13 additions & 8 deletions rust-runtime/aws-smithy-runtime/src/client/config_override.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

use aws_smithy_async::rt::sleep::SharedAsyncSleep;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, Store, StoreReplace};
use aws_smithy_types::config_bag::{
CloneableLayer, FrozenLayer, Layer, Storable, Store, StoreReplace,
};

macro_rules! component {
($typ:ty, $accessor:ident, $latest_accessor:ident) => {
Expand Down Expand Up @@ -39,14 +41,14 @@ macro_rules! latest_component {
}

struct Initial<'a> {
config: &'a mut Layer,
config: &'a mut CloneableLayer,
components: &'a mut RuntimeComponentsBuilder,
}

struct Override<'a> {
initial_config: FrozenLayer,
initial_components: &'a RuntimeComponentsBuilder,
config: &'a mut Layer,
config: &'a mut CloneableLayer,
components: &'a mut RuntimeComponentsBuilder,
}

Expand Down Expand Up @@ -74,7 +76,10 @@ pub struct Resolver<'a> {

impl<'a> Resolver<'a> {
/// Construct a new [`Resolver`] in _initial mode_.
pub fn initial(config: &'a mut Layer, components: &'a mut RuntimeComponentsBuilder) -> Self {
pub fn initial(
config: &'a mut CloneableLayer,
components: &'a mut RuntimeComponentsBuilder,
) -> Self {
Self {
inner: Inner::Initial(Initial { config, components }),
}
Expand All @@ -84,7 +89,7 @@ impl<'a> Resolver<'a> {
pub fn overrid(
initial_config: FrozenLayer,
initial_components: &'a RuntimeComponentsBuilder,
config: &'a mut Layer,
config: &'a mut CloneableLayer,
components: &'a mut RuntimeComponentsBuilder,
) -> Self {
Self {
Expand All @@ -103,7 +108,7 @@ impl<'a> Resolver<'a> {
}

/// Returns a mutable reference to the latest config.
pub fn config_mut(&mut self) -> &mut Layer {
pub fn config_mut(&mut self) -> &mut CloneableLayer {
match &mut self.inner {
Inner::Initial(initial) => initial.config,
Inner::Override(overrid) => overrid.config,
Expand Down Expand Up @@ -180,7 +185,7 @@ mod tests {

#[test]
fn initial_mode_config() {
let mut config = Layer::new("test");
let mut config = CloneableLayer::new("test");
let mut components = RuntimeComponentsBuilder::new("test");

let mut resolver = Resolver::initial(&mut config, &mut components);
Expand All @@ -199,7 +204,7 @@ mod tests {
fn override_mode_config() {
let mut initial_config = CloneableLayer::new("initial");
let initial_components = RuntimeComponentsBuilder::new("initial");
let mut config = Layer::new("override");
let mut config = CloneableLayer::new("override");
let mut components = RuntimeComponentsBuilder::new("override");

let resolver = Resolver::overrid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub(super) async fn orchestrate_endpoint(
.endpoint_resolver()
.resolve_endpoint(params)
.await?;
tracing::debug!("will use endpoint {:?}", endpoint);
apply_endpoint(request, &endpoint, endpoint_prefix)?;

// Make the endpoint config available to interceptors
Expand Down
Loading
Loading