Skip to content

Commit

Permalink
chore(sinks): Drop the custom SinkContext::default implementation (v…
Browse files Browse the repository at this point in the history
…ectordotdev#17804)

The `fn SinkContext::new_test` is actually just `SinkContext::default`
in disguise, so drop the custom function in favour of the auto-derived
implementation.
  • Loading branch information
bruceg authored Jun 29, 2023
1 parent 659e1e6 commit e66e285
Show file tree
Hide file tree
Showing 42 changed files with 106 additions and 116 deletions.
12 changes: 1 addition & 11 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync

dyn_clone::clone_trait_object!(SinkConfig);

#[derive(Debug, Clone)]
#[derive(Clone, Debug, Default)]
pub struct SinkContext {
pub healthcheck: SinkHealthcheckOptions,
pub globals: GlobalOptions,
Expand All @@ -244,16 +244,6 @@ pub struct SinkContext {
}

impl SinkContext {
#[cfg(test)]
pub fn new_test() -> Self {
Self {
healthcheck: SinkHealthcheckOptions::default(),
globals: GlobalOptions::default(),
proxy: ProxyConfig::default(),
schema: schema::Options::default(),
}
}

pub const fn globals(&self) -> &GlobalOptions {
&self.globals
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/amqp/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn amqp_happy_path() {
.await
.unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, healthcheck) = config.build(cx).await.unwrap();
healthcheck.await.expect("Health check failed");

Expand Down Expand Up @@ -153,7 +153,7 @@ async fn amqp_round_trip() {
.await
.unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (amqp_sink, healthcheck) = config.build(cx).await.unwrap();
healthcheck.await.expect("Health check failed");

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mod test {
.expect("config should be valid");
config.endpoint = mock_endpoint.to_string();

let context = SinkContext::new_test();
let context = SinkContext::default();
let (sink, _healthcheck) = config.build(context).await.unwrap();

let event = Event::Log(LogEvent::from("simple message"));
Expand Down
12 changes: 6 additions & 6 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn cloudwatch_insert_log_event() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -101,7 +101,7 @@ async fn cloudwatch_insert_log_events_sorted() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now() - Duration::days(1);

Expand Down Expand Up @@ -176,7 +176,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let now = chrono::Utc::now();

Expand Down Expand Up @@ -255,7 +255,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -310,7 +310,7 @@ async fn cloudwatch_insert_log_event_batched() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down Expand Up @@ -360,7 +360,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
acknowledgements: Default::default(),
};

let (sink, _) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();

let timestamp = chrono::Utc::now();

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_cloudwatch_metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn cloudwatch_metrics_healthcheck() {

#[tokio::test]
async fn cloudwatch_metrics_put_data() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let config = config();
let client = config.create_client(&cx.globals.proxy).await.unwrap();
let sink = CloudWatchMetricsSvc::new(config, client).unwrap();
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn cloudwatch_metrics_put_data() {

#[tokio::test]
async fn cloudwatch_metrics_namespace_partitioning() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let config = config();
let client = config.create_client(&cx.globals.proxy).await.unwrap();
let sink = CloudWatchMetricsSvc::new(config, client).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn firehose_put_records() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let (sink, _) = config.build(cx).await.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/firehose/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn check_batch_size() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let res = config.build(cx).await;

assert_eq!(
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn check_batch_events() {

let config = KinesisFirehoseSinkConfig { batch, base };

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let res = config.build(cx).await;

assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_kinesis/streams/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn kinesis_address() -> String {
// base,
// };
//
// let cx = SinkContext::new_test();
// let cx = SinkContext::default();
//
// let sink = config.build(cx).await.unwrap().0;
//
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn kinesis_put_records_without_partition_key() {
base,
};

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let sink = config.build(cx).await.unwrap().0;

Expand Down
18 changes: 9 additions & 9 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn s3_address() -> String {

#[tokio::test]
async fn s3_insert_message_into_with_flat_key_prefix() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -85,7 +85,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() {

#[tokio::test]
async fn s3_insert_message_into_with_folder_key_prefix() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -119,7 +119,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() {

#[tokio::test]
async fn s3_insert_message_into_with_ssekms_key_id() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -156,7 +156,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() {

#[tokio::test]
async fn s3_rotate_files_after_the_buffer_size_is_reached() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -213,7 +213,7 @@ async fn s3_gzip() {
// to 1000, and using gzip compression. We test to ensure that all of the keys we end up
// writing represent the sum total of the lines: we expect 3 batches, each of which should
// have 1000 lines.
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -258,7 +258,7 @@ async fn s3_zstd() {
// to 1000, and using zstd compression. We test to ensure that all of the keys we end up
// writing represent the sum total of the lines: we expect 3 batches, each of which should
// have 1000 lines.
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -303,7 +303,7 @@ async fn s3_zstd() {
// https://github.com/localstack/localstack/issues/4166
#[tokio::test]
async fn s3_insert_message_into_object_lock() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -357,7 +357,7 @@ async fn s3_insert_message_into_object_lock() {

#[tokio::test]
async fn acknowledges_failures() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();

Expand Down Expand Up @@ -408,7 +408,7 @@ async fn s3_healthchecks_invalid_bucket() {

#[tokio::test]
async fn s3_flush_on_exhaustion() {
let cx = SinkContext::new_test();
let cx = SinkContext::default();

let bucket = uuid::Uuid::new_v4().to_string();
create_bucket(&bucket, false).await;
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ mod integration_tests {
assert!(!token.is_empty(), "$AXIOM_TOKEN required");
let dataset = env::var("AXIOM_DATASET").unwrap();

let cx = SinkContext::new_test();
let cx = SinkContext::default();

let config = AxiomConfig {
url: Some(url.clone()),
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/azure_monitor_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ mod tests {
default_headers: HeaderMap::new(),
};

let context = SinkContext::new_test();
let context = SinkContext::default();
let client =
HttpClient::new(None, &context.proxy).expect("should not fail to create HTTP client");

Expand Down Expand Up @@ -617,7 +617,7 @@ mod tests {
"#,
)
.unwrap();
if config.build(SinkContext::new_test()).await.is_ok() {
if config.build(SinkContext::default()).await.is_ok() {
panic!("config.build failed to error");
}
}
Expand Down Expand Up @@ -657,7 +657,7 @@ mod tests {
"#,
)
.unwrap();
if config.build(SinkContext::new_test()).await.is_ok() {
if config.build(SinkContext::default()).await.is_ok() {
panic!("config.build failed to error");
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn insert_events() {
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, mut receiver) = make_event();
input_event
Expand Down Expand Up @@ -114,7 +114,7 @@ async fn skip_unknown_fields() {
.create_table(&table, "host String, timestamp String, message String")
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, mut receiver) = make_event();
input_event.as_mut_log().insert("unknown", "mysteries");
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn insert_events_unix_timestamps() {
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, _receiver) = make_event();

Expand Down Expand Up @@ -235,7 +235,7 @@ timestamp_format = "unix""#,
)
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (mut input_event, _receiver) = make_event();

Expand Down Expand Up @@ -298,7 +298,7 @@ async fn no_retry_on_incorrect_data() {
.create_table(&table, "host String, timestamp String")
.await;

let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();

Expand Down Expand Up @@ -340,7 +340,7 @@ async fn no_retry_on_incorrect_data_warp() {
batch,
..Default::default()
};
let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/databend/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn insert_event_with_cfg(cfg: String, table: String, client: DatabendAPICl
.unwrap();

let (config, _) = load_sink::<DatabendConfig>(&cfg).unwrap();
let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let (input_event, mut receiver) = make_event();
run_and_assert_sink_compliance(
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl ElasticsearchCommon {
#[cfg(test)]
pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result<Self> {
let mut commons =
Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?;
Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?;
assert_eq!(commons.len(), 1);
Ok(commons.remove(0))
}
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/elasticsearch/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn structures_events_correctly() {
.expect("Config error");
let base_url = common.base_url.clone();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, _hc) = config.build(cx.clone()).await.unwrap();

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
Expand Down Expand Up @@ -555,7 +555,7 @@ async fn run_insert_tests_with_config(
};
let base_url = common.base_url.clone();

let cx = SinkContext::new_test();
let cx = SinkContext::default();
let (sink, healthcheck) = config
.build(cx.clone())
.await
Expand Down Expand Up @@ -639,7 +639,7 @@ async fn run_insert_tests_with_config(
}

async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
let commons = ElasticsearchCommon::parse_many(config, cx.proxy())
.await
.expect("Config error");
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/gcp/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ mod integration_tests {
log_type: &str,
auth_path: &str,
) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
let cx = SinkContext::new_test();
let cx = SinkContext::default();
config(log_type, auth_path).build(cx).await
}

Expand Down
Loading

0 comments on commit e66e285

Please sign in to comment.