Skip to content

Commit

Permalink
feat(aws_s3 sink): add option to use virtual addressing (#21999)
Browse files Browse the repository at this point in the history
* feat(aws_s3 sink): add option to use virtual addressing

Signed-off-by: Scott Miller <[email protected]>

* fix(aws_s3 sink): move force_path_style to S3ClientBuilder

Signed-off-by: Scott Miller <[email protected]>

* fix(aws_s3 sink): simplify S3ClientBuilder logic

---------

Signed-off-by: Scott Miller <[email protected]>
  • Loading branch information
sam6258 authored Dec 19, 2024
1 parent 7c6d0c9 commit 029a2ff
Show file tree
Hide file tree
Showing 22 changed files with 112 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior.

authors: sam6258
22 changes: 15 additions & 7 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub trait ClientBuilder {
type Client;

/// Build the client using the given config settings.
fn build(config: &SdkConfig) -> Self::Client;
fn build(&self, config: &SdkConfig) -> Self::Client;
}

fn region_provider(
Expand Down Expand Up @@ -161,28 +161,36 @@ async fn resolve_region(
}

/// Create the SDK client using the provided settings.
pub async fn create_client<T: ClientBuilder>(
pub async fn create_client<T>(
builder: &T,
auth: &AwsAuthentication,
region: Option<Region>,
endpoint: Option<String>,
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
timeout: &Option<AwsTimeout>,
) -> crate::Result<T::Client> {
create_client_and_region::<T>(auth, region, endpoint, proxy, tls_options, timeout)
) -> crate::Result<T::Client>
where
T: ClientBuilder,
{
create_client_and_region::<T>(builder, auth, region, endpoint, proxy, tls_options, timeout)
.await
.map(|(client, _)| client)
}

/// Create the SDK client and resolve the region using the provided settings.
pub async fn create_client_and_region<T: ClientBuilder>(
pub async fn create_client_and_region<T>(
builder: &T,
auth: &AwsAuthentication,
region: Option<Region>,
endpoint: Option<String>,
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
timeout: &Option<AwsTimeout>,
) -> crate::Result<(T::Client, Region)> {
) -> crate::Result<(T::Client, Region)>
where
T: ClientBuilder,
{
let retry_config = RetryConfig::disabled();

// The default credentials chains will look for a region if not given but we'd like to
Expand Down Expand Up @@ -239,7 +247,7 @@ pub async fn create_client_and_region<T: ClientBuilder>(

let config = config_builder.build();

Ok((T::build(&config), region))
Ok((T::build(builder, &config), region))
}

#[derive(Snafu, Debug)]
Expand Down
11 changes: 7 additions & 4 deletions src/common/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ use aws_sdk_s3::config;

use crate::aws::ClientBuilder;

pub(crate) struct S3ClientBuilder;
pub(crate) struct S3ClientBuilder {
pub force_path_style: Option<bool>,
}

impl ClientBuilder for S3ClientBuilder {
type Client = aws_sdk_s3::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
let config = config::Builder::from(config).force_path_style(true).build();
aws_sdk_s3::client::Client::from_conf(config)
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
let builder =
config::Builder::from(config).force_path_style(self.force_path_style.unwrap_or(true));
aws_sdk_s3::client::Client::from_conf(builder.build())
}
}
2 changes: 1 addition & 1 deletion src/common/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub(crate) struct SqsClientBuilder;
impl ClientBuilder for SqsClientBuilder {
type Client = aws_sdk_sqs::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_sqs::client::Client::new(config)
}
}
3 changes: 2 additions & 1 deletion src/secrets/aws_secrets_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct SecretsManagerClientBuilder;
impl ClientBuilder for SecretsManagerClientBuilder {
type Client = Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
let config = config::Builder::from(config).build();
Client::from_conf(config)
}
Expand Down Expand Up @@ -57,6 +57,7 @@ impl SecretBackend for AwsSecretsManagerBackend {
_: &mut signal::SignalRx,
) -> crate::Result<HashMap<String, String>> {
let client = create_client::<SecretsManagerClientBuilder>(
&SecretsManagerClientBuilder {},
&self.auth,
self.region.region(),
self.region.endpoint(),
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct CloudwatchLogsClientBuilder;
impl ClientBuilder for CloudwatchLogsClientBuilder {
type Client = aws_sdk_cloudwatchlogs::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_cloudwatchlogs::client::Client::new(config)
}
}
Expand Down Expand Up @@ -169,6 +169,7 @@ pub struct CloudwatchLogsSinkConfig {
impl CloudwatchLogsSinkConfig {
pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
create_client::<CloudwatchLogsClientBuilder>(
&CloudwatchLogsClientBuilder {},
&self.auth,
self.region.region(),
self.region.endpoint(),
Expand Down
14 changes: 11 additions & 3 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,17 @@ async fn create_client_test() -> CloudwatchLogsClient {
let endpoint = Some(cloudwatch_address());
let proxy = ProxyConfig::default();

create_client::<CloudwatchLogsClientBuilder>(&auth, region, endpoint, &proxy, &None, &None)
.await
.unwrap()
create_client::<CloudwatchLogsClientBuilder>(
&CloudwatchLogsClientBuilder {},
&auth,
region,
endpoint,
&proxy,
&None,
&None,
)
.await
.unwrap()
}

async fn ensure_group() {
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct CloudwatchMetricsClientBuilder;
impl ClientBuilder for CloudwatchMetricsClientBuilder {
type Client = aws_sdk_cloudwatch::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_cloudwatch::client::Client::new(config)
}
}
Expand Down Expand Up @@ -172,6 +172,7 @@ impl CloudWatchMetricsSinkConfig {
};

create_client::<CloudwatchMetricsClientBuilder>(
&CloudwatchMetricsClientBuilder {},
&self.auth,
region,
self.region.endpoint(),
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct KinesisFirehoseClientBuilder;
impl ClientBuilder for KinesisFirehoseClientBuilder {
type Client = KinesisClient;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
Self::Client::new(config)
}
}
Expand Down Expand Up @@ -102,6 +102,7 @@ impl KinesisFirehoseSinkConfig {

pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
create_client::<KinesisFirehoseClientBuilder>(
&KinesisFirehoseClientBuilder {},
&self.base.auth,
self.base.region.region(),
self.base.region.endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client {
let proxy = ProxyConfig::default();

create_client::<KinesisFirehoseClientBuilder>(
&KinesisFirehoseClientBuilder {},
&auth,
region_endpoint.region(),
region_endpoint.endpoint(),
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct KinesisClientBuilder;
impl ClientBuilder for KinesisClientBuilder {
type Client = KinesisClient;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
KinesisClient::new(config)
}
}
Expand Down Expand Up @@ -99,6 +99,7 @@ impl KinesisStreamsSinkConfig {

pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
create_client::<KinesisClientBuilder>(
&KinesisClientBuilder {},
&self.base.auth,
self.base.region.region(),
self.base.region.endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async fn client() -> aws_sdk_kinesis::Client {
let proxy = ProxyConfig::default();
let region = RegionOrEndpoint::with_both("us-east-1", kinesis_address());
create_client::<KinesisClientBuilder>(
&KinesisClientBuilder {},
&auth,
region.region(),
region.endpoint(),
Expand Down
16 changes: 15 additions & 1 deletion src/sinks/aws_s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ pub struct S3SinkConfig {
#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,

/// Specifies which addressing style to use.
///
/// This controls if the bucket name is in the hostname or part of the URL.
#[serde(default = "crate::serde::default_true")]
pub force_path_style: bool,
}

pub(super) fn default_key_prefix() -> String {
Expand Down Expand Up @@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig {
auth: AwsAuthentication::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: Default::default(),
})
.unwrap()
}
Expand Down Expand Up @@ -251,7 +258,14 @@ impl S3SinkConfig {
}

pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
s3_common::config::create_service(
&self.region,
&self.auth,
proxy,
&self.tls,
self.force_path_style,
)
.await
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() {
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: true,
}
};
let prefix = config.key_prefix.clone();
Expand Down Expand Up @@ -489,7 +490,12 @@ async fn client() -> S3Client {
let region = RegionOrEndpoint::with_both("us-east-1", s3_address());
let proxy = ProxyConfig::default();
let tls_options = None;
let force_path_style_value: bool = true;

create_client::<S3ClientBuilder>(
&S3ClientBuilder {
force_path_style: Some(force_path_style_value),
},
&auth,
region.region(),
region.endpoint(),
Expand Down Expand Up @@ -522,6 +528,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
auth: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
force_path_style: true,
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/sinks/aws_s_s/sns/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl GenerateConfig for SnsSinkConfig {
impl SnsSinkConfig {
pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
create_client::<SnsClientBuilder>(
&SnsClientBuilder {},
&self.base_config.auth,
self.region.region(),
self.region.endpoint(),
Expand Down Expand Up @@ -108,7 +109,7 @@ pub(super) struct SnsClientBuilder;
impl ClientBuilder for SnsClientBuilder {
type Client = aws_sdk_sns::client::Client;

fn build(config: &aws_types::SdkConfig) -> Self::Client {
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_sns::client::Client::new(config)
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/aws_s_s/sns/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async fn create_sns_test_client() -> SnsClient {
let endpoint = sns_address();
let proxy = ProxyConfig::default();
create_client::<SnsClientBuilder>(
&SnsClientBuilder {},
&auth,
Some(Region::new("us-east-1")),
Some(endpoint),
Expand All @@ -53,6 +54,7 @@ async fn create_sqs_test_client() -> SqsClient {
let endpoint = sqs_address();
let proxy = ProxyConfig::default();
create_client::<SqsClientBuilder>(
&SqsClientBuilder {},
&auth,
Some(Region::new("us-east-1")),
Some(endpoint),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_s_s/sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl GenerateConfig for SqsSinkConfig {
impl SqsSinkConfig {
pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SqsClient> {
create_client::<SqsClientBuilder>(
&SqsClientBuilder {},
&self.base_config.auth,
self.region.region(),
self.region.endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_s_s/sqs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn create_test_client() -> SqsClient {
let endpoint = sqs_address();
let proxy = ProxyConfig::default();
create_client::<SqsClientBuilder>(
&SqsClientBuilder {},
&auth,
Some(Region::new("us-east-1")),
Some(endpoint),
Expand Down
18 changes: 15 additions & 3 deletions src/sinks/s3_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,24 @@ pub async fn create_service(
auth: &AwsAuthentication,
proxy: &ProxyConfig,
tls_options: &Option<TlsConfig>,
force_path_style: impl Into<bool>,
) -> crate::Result<S3Service> {
let endpoint = region.endpoint();
let region = region.region();
let client =
create_client::<S3ClientBuilder>(auth, region.clone(), endpoint, proxy, tls_options, &None)
.await?;
let force_path_style_value: bool = force_path_style.into();

let client = create_client::<S3ClientBuilder>(
&S3ClientBuilder {
force_path_style: Some(force_path_style_value),
},
auth,
region.clone(),
endpoint,
proxy,
tls_options,
&None,
)
.await?;
Ok(S3Service::new(client))
}

Expand Down
10 changes: 10 additions & 0 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,12 @@ impl AwsS3Config {
) -> crate::Result<sqs::Ingestor> {
let region = self.region.region();
let endpoint = self.region.endpoint();
let force_path_style_value: bool = true;

let s3_client = create_client::<S3ClientBuilder>(
&S3ClientBuilder {
force_path_style: Some(force_path_style_value),
},
&self.auth,
region.clone(),
endpoint.clone(),
Expand All @@ -248,6 +252,7 @@ impl AwsS3Config {
match self.sqs {
Some(ref sqs) => {
let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
&SqsClientBuilder {},
&self.auth,
region.clone(),
endpoint,
Expand Down Expand Up @@ -1016,7 +1021,11 @@ mod integration_tests {
endpoint: Some(s3_address()),
};
let proxy_config = ProxyConfig::default();
let force_path_style_value: bool = true;
create_client::<S3ClientBuilder>(
&S3ClientBuilder {
force_path_style: Some(force_path_style_value),
},
&auth,
region_endpoint.region(),
region_endpoint.endpoint(),
Expand All @@ -1036,6 +1045,7 @@ mod integration_tests {
};
let proxy_config = ProxyConfig::default();
create_client::<SqsClientBuilder>(
&SqsClientBuilder {},
&auth,
region_endpoint.region(),
region_endpoint.endpoint(),
Expand Down
Loading

0 comments on commit 029a2ff

Please sign in to comment.