Skip to content

Commit

Permalink
Add incomplete Event Stream support with working Amazon Transcribe ex…
Browse files Browse the repository at this point in the history
…ample (#653)

* Add incomplete Event Stream support with working Amazon Transcribe example

* Make the raw response in SdkError generic

* Fix XmlBindingTraitSerializerGeneratorTest

* Make the build aware of the SMITHYRS_EXPERIMENTAL_EVENTSTREAM switch

* Fix SigV4SigningCustomizationTest

* Update changelog

* Fix build when SMITHYRS_EXPERIMENTAL_EVENTSTREAM is not set

* Add initial unit test for EventStreamUnmarshallerGenerator

* Add event header unmarshalling support

* Don't pull in event stream dependencies by default

* Only add event stream signer to config for services that need it

* Move event stream inlineables into smithy-eventstream

* Fix some clippy lints

* Transform event stream unions

* Fix crash in SigV4SigningDecorator

* Add test for unmarshalling errors

* Incorporate CR feedback
  • Loading branch information
jdisanti authored Aug 20, 2021
1 parent b119782 commit 3b8f69c
Show file tree
Hide file tree
Showing 73 changed files with 2,389 additions and 245 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ vNext (Month Day, Year)

**New this week**

- (When complete) Add Event Stream support (#653, #xyz)
- (When complete) Add profile file provider for region (#594, #xyz)

v0.21 (August 19th, 2021)
Expand Down
5 changes: 5 additions & 0 deletions aws/rust-runtime/aws-sig-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
sign-eventstream = ["smithy-eventstream", "aws-sigv4/sign-eventstream"]
default = []

[dependencies]
http = "0.2.2"
aws-sigv4 = { path = "../aws-sigv4" }
aws-auth = { path = "../aws-auth" }
aws-types = { path = "../aws-types" }
smithy-http = { path = "../../../rust-runtime/smithy-http" }
smithy-eventstream = { path = "../../../rust-runtime/smithy-eventstream", optional = true }
# Trying this out as an experiment. thiserror can be removed and replaced with hand written error
# implementations and it is not a breaking change.
thiserror = "1"
Expand Down
129 changes: 129 additions & 0 deletions aws/rust-runtime/aws-sig-auth/src/event_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

use crate::middleware::Signature;
use aws_auth::Credentials;
use aws_sigv4::event_stream::sign_message;
use aws_sigv4::SigningParams;
use aws_types::region::SigningRegion;
use aws_types::SigningService;
use smithy_eventstream::frame::{Message, SignMessage, SignMessageError};
use smithy_http::property_bag::PropertyBag;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::SystemTime;

/// Event Stream SigV4 signing implementation.
#[derive(Debug)]
pub struct SigV4Signer {
properties: Arc<Mutex<PropertyBag>>,
last_signature: Option<String>,
}

impl SigV4Signer {
pub fn new(properties: Arc<Mutex<PropertyBag>>) -> Self {
Self {
properties,
last_signature: None,
}
}
}

impl SignMessage for SigV4Signer {
fn sign(&mut self, message: Message) -> Result<Message, SignMessageError> {
let properties = PropertyAccessor(self.properties.lock().unwrap());
if self.last_signature.is_none() {
// The Signature property should exist in the property bag for all Event Stream requests.
self.last_signature = Some(properties.expect::<Signature>().as_ref().into())
}

// Every single one of these values would have been retrieved during the initial request,
// so we can safely assume they all exist in the property bag at this point.
let credentials = properties.expect::<Credentials>();
let region = properties.expect::<SigningRegion>();
let signing_service = properties.expect::<SigningService>();
let time = properties
.get::<SystemTime>()
.copied()
.unwrap_or_else(SystemTime::now);
let params = SigningParams {
access_key: credentials.access_key_id(),
secret_key: credentials.secret_access_key(),
security_token: credentials.session_token(),
region: region.as_ref(),
service_name: signing_service.as_ref(),
date_time: time.into(),
settings: (),
};

let (signed_message, signature) =
sign_message(&message, self.last_signature.as_ref().unwrap(), &params).into_parts();
self.last_signature = Some(signature);

Ok(signed_message)
}
}

// TODO(EventStream): Make a new type around `Arc<Mutex<PropertyBag>>` called `SharedPropertyBag`
// and abstract the mutex away entirely.
struct PropertyAccessor<'a>(MutexGuard<'a, PropertyBag>);

impl<'a> PropertyAccessor<'a> {
fn get<T: Send + Sync + 'static>(&self) -> Option<&T> {
self.0.get::<T>()
}

fn expect<T: Send + Sync + 'static>(&self) -> &T {
self.get::<T>()
.expect("property should have been inserted into property bag via middleware")
}
}

#[cfg(test)]
mod tests {
use crate::event_stream::SigV4Signer;
use crate::middleware::Signature;
use aws_auth::Credentials;
use aws_types::region::Region;
use aws_types::region::SigningRegion;
use aws_types::SigningService;
use smithy_eventstream::frame::{HeaderValue, Message, SignMessage};
use smithy_http::property_bag::PropertyBag;
use std::sync::{Arc, Mutex};
use std::time::{Duration, UNIX_EPOCH};

#[test]
fn sign_message() {
let region = Region::new("us-east-1");
let mut properties = PropertyBag::new();
properties.insert(region.clone());
properties.insert(UNIX_EPOCH + Duration::new(1611160427, 0));
properties.insert(SigningService::from_static("transcribe"));
properties.insert(Credentials::from_keys("AKIAfoo", "bar", None));
properties.insert(SigningRegion::from(region));
properties.insert(Signature::new("initial-signature".into()));

let mut signer = SigV4Signer::new(Arc::new(Mutex::new(properties)));
let mut signatures = Vec::new();
for _ in 0..5 {
let signed = signer
.sign(Message::new(&b"identical message"[..]))
.unwrap();
if let HeaderValue::ByteArray(signature) = signed
.headers()
.iter()
.find(|h| h.name().as_str() == ":chunk-signature")
.unwrap()
.value()
{
signatures.push(signature.clone());
} else {
panic!("failed to get the :chunk-signature")
}
}
for i in 1..signatures.len() {
assert_ne!(signatures[i - 1], signatures[i]);
}
}
}
3 changes: 3 additions & 0 deletions aws/rust-runtime/aws-sig-auth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@
//!
//! In the future, additional signature algorithms can be enabled as Cargo Features.
#[cfg(feature = "sign-eventstream")]
pub mod event_stream;

pub mod middleware;
pub mod signer;
4 changes: 3 additions & 1 deletion aws/rust-runtime/aws-sig-auth/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ impl Signature {
pub fn new(signature: String) -> Self {
Self(signature)
}
}

pub fn as_str(&self) -> &str {
impl AsRef<str> for Signature {
fn as_ref(&self) -> &str {
&self.0
}
}
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-sigv4/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "AWS SigV4 signer"
[features]
sign-http = ["http", "http-body", "percent-encoding", "form_urlencoded"]
sign-eventstream = ["smithy-eventstream"]
default = ["sign-http", "sign-eventstream"]
default = ["sign-http"]

[dependencies]
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ object AwsRuntimeType {
val S3Errors by lazy { RuntimeType.forInlineDependency(InlineAwsDependency.forRustFile("s3_errors")) }
}

fun RuntimeConfig.awsRuntimeDependency(name: String, features: List<String> = listOf()): CargoDependency =
fun RuntimeConfig.awsRuntimeDependency(name: String, features: Set<String> = setOf()): CargoDependency =
CargoDependency(name, awsRoot().crateLocation(), features = features)
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class IntegrationTestDependencies(
override fun section(section: LibRsSection) = when (section) {
LibRsSection.Body -> writable {
if (hasTests) {
val smithyClient = CargoDependency.SmithyClient(runtimeConfig).copy(features = listOf("test-util"), scope = DependencyScope.Dev)
val smithyClient = CargoDependency.SmithyClient(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
addDependency(smithyClient)
addDependency(SerdeJson)
addDependency(Tokio)
Expand All @@ -63,5 +64,5 @@ class IntegrationTestDependencies(
}

val Criterion = CargoDependency("criterion", CratesIo("0.3"), scope = DependencyScope.Dev)
val SerdeJson = CargoDependency("serde_json", CratesIo("1"), features = emptyList(), scope = DependencyScope.Dev)
val Tokio = CargoDependency("tokio", CratesIo("1"), features = listOf("macros", "test-util"), scope = DependencyScope.Dev)
val SerdeJson = CargoDependency("serde_json", CratesIo("1"), features = emptySet(), scope = DependencyScope.Dev)
val Tokio = CargoDependency("tokio", CratesIo("1"), features = setOf("macros", "test-util"), scope = DependencyScope.Dev)
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.model.shapes.ServiceShape
import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.model.traits.OptionalAuthTrait
import software.amazon.smithy.rust.codegen.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.rustlang.Writable
import software.amazon.smithy.rust.codegen.rustlang.asType
import software.amazon.smithy.rust.codegen.rustlang.rust
import software.amazon.smithy.rust.codegen.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.rustlang.writable
import software.amazon.smithy.rust.codegen.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.smithy.customize.OperationCustomization
import software.amazon.smithy.rust.codegen.smithy.customize.OperationSection
import software.amazon.smithy.rust.codegen.smithy.customize.RustCodegenDecorator
Expand All @@ -28,11 +30,14 @@ import software.amazon.smithy.rust.codegen.smithy.generators.config.ServiceConfi
import software.amazon.smithy.rust.codegen.smithy.letIf
import software.amazon.smithy.rust.codegen.util.dq
import software.amazon.smithy.rust.codegen.util.expectTrait
import software.amazon.smithy.rust.codegen.util.hasEventStreamOperations
import software.amazon.smithy.rust.codegen.util.hasTrait
import software.amazon.smithy.rust.codegen.util.isInputEventStream

/**
* The SigV4SigningDecorator:
* - adds a `signing_service()` method to `config` to return the default signing service
* - adds a `new_event_stream_signer()` method to `config` to create an Event Stream SigV4 signer
* - sets the `SigningService` during operation construction
* - sets a default `OperationSigningConfig` A future enhancement will customize this for specific services that need
* different behavior.
Expand All @@ -47,8 +52,12 @@ class SigV4SigningDecorator : RustCodegenDecorator {
protocolConfig: ProtocolConfig,
baseCustomizations: List<ConfigCustomization>
): List<ConfigCustomization> {
return baseCustomizations.letIf(applies(protocolConfig)) {
it + SigV4SigningConfig(protocolConfig.serviceShape.expectTrait())
return baseCustomizations.letIf(applies(protocolConfig)) { customizations ->
customizations + SigV4SigningConfig(
protocolConfig.runtimeConfig,
protocolConfig.serviceShape.hasEventStreamOperations(protocolConfig.model),
protocolConfig.serviceShape.expectTrait()
)
}
}

Expand All @@ -58,26 +67,63 @@ class SigV4SigningDecorator : RustCodegenDecorator {
baseCustomizations: List<OperationCustomization>
): List<OperationCustomization> {
return baseCustomizations.letIf(applies(protocolConfig)) {
it + SigV4SigningFeature(operation, protocolConfig.runtimeConfig, protocolConfig.serviceShape, protocolConfig.model)
it + SigV4SigningFeature(
protocolConfig.model,
operation,
protocolConfig.runtimeConfig,
protocolConfig.serviceShape,
)
}
}
}

class SigV4SigningConfig(private val sigV4Trait: SigV4Trait) : ConfigCustomization() {
class SigV4SigningConfig(
runtimeConfig: RuntimeConfig,
private val serviceHasEventStream: Boolean,
private val sigV4Trait: SigV4Trait
) : ConfigCustomization() {
private val codegenScope = arrayOf(
"SigV4Signer" to RuntimeType(
"SigV4Signer",
runtimeConfig.awsRuntimeDependency("aws-sig-auth", setOf("sign-eventstream")),
"aws_sig_auth::event_stream"
),
"PropertyBag" to RuntimeType(
"PropertyBag",
CargoDependency.SmithyHttp(runtimeConfig),
"smithy_http::property_bag"
)
)

override fun section(section: ServiceConfig): Writable {
return when (section) {
is ServiceConfig.ConfigImpl -> writable {
rust(
rustTemplate(
"""
/// The signature version 4 service signing name to use in the credential scope when signing requests.
///
/// The signing service may be overidden by the `Endpoint`, or by specifying a custom [`SigningService`](aws_types::SigningService) during
/// operation construction
/// The signing service may be overridden by the `Endpoint`, or by specifying a custom
/// [`SigningService`](aws_types::SigningService) during operation construction
pub fn signing_service(&self) -> &'static str {
${sigV4Trait.name.dq()}
}
"""
""",
*codegenScope
)
if (serviceHasEventStream) {
rustTemplate(
"""
/// Creates a new Event Stream `SignMessage` implementor.
pub fn new_event_stream_signer(
&self,
properties: std::sync::Arc<std::sync::Mutex<#{PropertyBag}>>
) -> #{SigV4Signer} {
#{SigV4Signer}::new(properties)
}
""",
*codegenScope
)
}
}
else -> emptySection
}
Expand All @@ -95,10 +141,10 @@ fun disableDoubleEncode(service: ServiceShape) = when {
}

class SigV4SigningFeature(
private val model: Model,
private val operation: OperationShape,
runtimeConfig: RuntimeConfig,
private val service: ServiceShape,
model: Model
) :
OperationCustomization() {
private val codegenScope =
Expand All @@ -111,9 +157,9 @@ class SigV4SigningFeature(
is OperationSection.MutateRequest -> writable {
rustTemplate(
"""
##[allow(unused_mut)]
let mut signing_config = #{sig_auth}::signer::OperationSigningConfig::default_config();
""",
##[allow(unused_mut)]
let mut signing_config = #{sig_auth}::signer::OperationSigningConfig::default_config();
""",
*codegenScope
)
if (needsAmzSha256(service)) {
Expand All @@ -128,6 +174,12 @@ class SigV4SigningFeature(
"${section.request}.properties_mut().insert(#{sig_auth}::signer::SignableBody::UnsignedPayload);",
*codegenScope
)
} else if (operation.isInputEventStream(model)) {
// TODO(EventStream): Is this actually correct for all Event Stream operations?
rustTemplate(
"${section.request}.properties_mut().insert(#{sig_auth}::signer::SignableBody::Bytes(&[]));",
*codegenScope
)
}
// some operations are either unsigned or optionally signed:
val authSchemes = serviceIndex.getEffectiveAuthSchemes(service, operation)
Expand All @@ -140,9 +192,9 @@ class SigV4SigningFeature(
}
rustTemplate(
"""
${section.request}.properties_mut().insert(signing_config);
${section.request}.properties_mut().insert(#{aws_types}::SigningService::from_static(${section.config}.signing_service()));
""",
${section.request}.properties_mut().insert(signing_config);
${section.request}.properties_mut().insert(#{aws_types}::SigningService::from_static(${section.config}.signing_service()));
""",
*codegenScope
)
}
Expand Down
Loading

0 comments on commit 3b8f69c

Please sign in to comment.