From be75827bcf685e2e69ea9f1928b1450c019fb481 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Wed, 13 Dec 2023 18:15:35 +0100 Subject: [PATCH] fix: Enable S3 integration tests to be configured via environment variables --- crates/deltalake-core/src/test_utils.rs | 123 +++++++++--------- .../tests/integration_checkpoint.rs | 2 + .../tests/integration_concurrent_writes.rs | 5 + .../tests/integration_s3_dynamodb.rs | 3 +- 4 files changed, 72 insertions(+), 61 deletions(-) diff --git a/crates/deltalake-core/src/test_utils.rs b/crates/deltalake-core/src/test_utils.rs index 190b720baf..c594a80a63 100644 --- a/crates/deltalake-core/src/test_utils.rs +++ b/crates/deltalake-core/src/test_utils.rs @@ -6,6 +6,7 @@ use fs_extra::dir::{copy, CopyOptions}; use object_store::DynObjectStore; use rand::Rng; use serde_json::json; +use std::collections::HashMap; use std::env; use std::sync::Arc; use tempdir::TempDir; @@ -18,6 +19,8 @@ pub struct IntegrationContext { bucket: String, store: Arc, tmp_dir: TempDir, + /// environment variables valid before `prepare_env()` modified them + env_vars: HashMap, } impl IntegrationContext { @@ -29,6 +32,9 @@ impl IntegrationContext { #[cfg(test)] dotenvy::dotenv().ok(); + // save existing environment variables + let env_vars = std::env::vars().collect(); + integration.prepare_env(); let tmp_dir = TempDir::new("")?; @@ -97,6 +103,7 @@ impl IntegrationContext { bucket, store, tmp_dir, + env_vars, }) } @@ -162,6 +169,18 @@ impl IntegrationContext { }; Ok(()) } + + fn restore_env(&self) { + let env_vars: HashMap<_, _> = std::env::vars().collect(); + for (key, _) in env_vars { + if !self.env_vars.contains_key(&key) { + std::env::remove_var(key) + } + } + for (key, value) in self.env_vars.iter() { + std::env::set_var(key, value); + } + } } impl Drop for IntegrationContext { @@ -184,6 +203,7 @@ impl Drop for IntegrationContext { hdfs_cli::delete_dir(&self.bucket).unwrap(); } }; + self.restore_env(); } } @@ -395,20 +415,8 @@ pub mod s3_cli { /// Create a new bucket pub fn create_bucket(bucket_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); - let region = std::env::var(s3_storage_options::AWS_REGION) - .expect("variable AWS_REGION must be set to connect to S3"); let mut child = Command::new("aws") - .args([ - "s3", - "mb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--region", - ®ion, - ]) + .args(["s3", "mb", bucket_name.as_ref()]) .spawn() .expect("aws command is installed"); child.wait() @@ -416,17 +424,8 @@ pub mod s3_cli { /// delete bucket pub fn delete_bucket(bucket_name: impl AsRef) -> std::io::Result { - let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); let mut child = Command::new("aws") - .args([ - "s3", - "rb", - bucket_name.as_ref(), - "--endpoint-url", - &endpoint, - "--force", - ]) + .args(["s3", "rb", bucket_name.as_ref(), "--force"]) .spawn() .expect("aws command is installed"); child.wait() @@ -437,16 +436,12 @@ pub mod s3_cli { source: impl AsRef, destination: impl AsRef, ) -> std::io::Result { - let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable ENDPOINT must be set to connect to S3"); let mut child = Command::new("aws") .args([ "s3", "cp", source.as_ref(), destination.as_ref(), - "--endpoint-url", - &endpoint, "--recursive", ]) .spawn() @@ -456,13 +451,18 @@ pub mod s3_cli { /// prepare_env pub fn prepare_env() { - set_env_if_not_set( - s3_storage_options::AWS_ENDPOINT_URL, - "http://localhost:4566", - ); + match std::env::var(s3_storage_options::AWS_ENDPOINT_URL).ok() { + Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => { + std::env::remove_var(s3_storage_options::AWS_ENDPOINT_URL) + } + Some(_) => (), + None => std::env::set_var( + s3_storage_options::AWS_ENDPOINT_URL, + "http://localhost:4566", + ), + } set_env_if_not_set(s3_storage_options::AWS_ACCESS_KEY_ID, "deltalake"); set_env_if_not_set(s3_storage_options::AWS_SECRET_ACCESS_KEY, "weloverust"); - set_env_if_not_set("AWS_DEFAULT_REGION", "us-east-1"); set_env_if_not_set(s3_storage_options::AWS_REGION, "us-east-1"); set_env_if_not_set(s3_storage_options::AWS_S3_LOCKING_PROVIDER, "dynamodb"); set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table"); @@ -472,44 +472,58 @@ pub mod s3_cli { fn create_dynamodb_table( table_name: &str, - endpoint_url: &str, attr_definitions: &[&str], key_schema: &[&str], ) -> std::io::Result { - println!("creating table {}", table_name); - let args01 = [ + let args = [ "dynamodb", "create-table", "--table-name", &table_name, - "--endpoint-url", - &endpoint_url, "--provisioned-throughput", "ReadCapacityUnits=10,WriteCapacityUnits=10", "--attribute-definitions", ]; - let args: Vec<_> = args01 - .iter() - .chain(attr_definitions.iter()) - .chain(["--key-schema"].iter()) - .chain(key_schema) - .collect(); let mut child = Command::new("aws") .args(args) + .args(attr_definitions.iter()) + .arg("--key-schema") + .args(key_schema) .stdout(Stdio::null()) .spawn() .expect("aws command is installed"); - child.wait() + let status = child.wait()?; + wait_for_table(table_name)?; + Ok(status) + } + + fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option { + haystack + .windows(needle.len()) + .position(|window| window == needle) + } + + fn wait_for_table(table_name: &str) -> std::io::Result<()> { + let args = ["dynamodb", "describe-table", "--table-name", &table_name]; + loop { + let output = Command::new("aws") + .args(args) + .output() + .expect("aws command is installed"); + if find_subsequence(&output.stdout, "CREATING".as_bytes()).is_some() { + std::thread::sleep(std::time::Duration::from_millis(200)); + continue; + } else { + return Ok(()); + } + } } pub fn create_lock_table() -> std::io::Result { - let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator"); let table_name = std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); create_dynamodb_table( &table_name, - &endpoint_url, &[ "AttributeName=tablePath,AttributeType=S", "AttributeName=fileName,AttributeType=S", @@ -521,16 +535,9 @@ pub mod s3_cli { ) } - fn delete_dynamodb_table(table_name: &str, endpoint_url: &str) -> std::io::Result { + fn delete_dynamodb_table(table_name: &str) -> std::io::Result { let mut child = Command::new("aws") - .args([ - "dynamodb", - "delete-table", - "--table-name", - &table_name, - "--endpoint-url", - &endpoint_url, - ]) + .args(["dynamodb", "delete-table", "--table-name", &table_name]) .stdout(Stdio::null()) .spawn() .expect("aws command is installed"); @@ -538,11 +545,9 @@ pub mod s3_cli { } pub fn delete_lock_table() -> std::io::Result { - let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) - .expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator"); let table_name = std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into()); - delete_dynamodb_table(&table_name, &endpoint_url) + delete_dynamodb_table(&table_name) } } diff --git a/crates/deltalake-core/tests/integration_checkpoint.rs b/crates/deltalake-core/tests/integration_checkpoint.rs index 56d253eb85..768b1172db 100644 --- a/crates/deltalake-core/tests/integration_checkpoint.rs +++ b/crates/deltalake-core/tests/integration_checkpoint.rs @@ -13,6 +13,7 @@ use std::time::Duration; use tokio::time::sleep; #[tokio::test] +#[serial] async fn cleanup_metadata_fs_test() -> TestResult { let context = IntegrationContext::new(StorageIntegration::Local)?; cleanup_metadata_test(&context).await?; @@ -109,6 +110,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { } #[tokio::test] +#[serial] async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { let _ = std::fs::remove_dir_all("./tests/data/issue_1420"); diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index 67fbbe38c5..f57167f2c1 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -6,12 +6,14 @@ use deltalake_core::operations::DeltaOps; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; use deltalake_core::{DeltaTable, DeltaTableBuilder}; +use serial_test::serial; use std::collections::HashMap; use std::future::Future; use std::iter::FromIterator; use std::time::Duration; #[tokio::test] +#[serial] async fn test_concurrent_writes_local() -> TestResult { test_concurrent_writes(StorageIntegration::Local).await?; Ok(()) @@ -19,6 +21,7 @@ async fn test_concurrent_writes_local() -> TestResult { #[cfg(feature = "s3")] #[tokio::test] +#[serial] async fn concurrent_writes_s3() -> TestResult { test_concurrent_writes(StorageIntegration::Amazon).await?; Ok(()) @@ -26,6 +29,7 @@ async fn concurrent_writes_s3() -> TestResult { #[cfg(feature = "azure")] #[tokio::test] +#[serial] async fn test_concurrent_writes_azure() -> TestResult { test_concurrent_writes(StorageIntegration::Microsoft).await?; Ok(()) @@ -35,6 +39,7 @@ async fn test_concurrent_writes_azure() -> TestResult { #[ignore] #[cfg(feature = "hdfs")] #[tokio::test] +#[serial] async fn test_concurrent_writes_hdfs() -> TestResult { test_concurrent_writes(StorageIntegration::Hdfs).await?; Ok(()) diff --git a/crates/deltalake-core/tests/integration_s3_dynamodb.rs b/crates/deltalake-core/tests/integration_s3_dynamodb.rs index f347e45efd..38bd8e3a16 100644 --- a/crates/deltalake-core/tests/integration_s3_dynamodb.rs +++ b/crates/deltalake-core/tests/integration_s3_dynamodb.rs @@ -148,7 +148,7 @@ async fn test_repair_on_update() -> TestResult<()> { const WORKERS: i64 = 3; const COMMITS: i64 = 5; -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_concurrent_writers() -> TestResult<()> { // Goal: a test with multiple writers, very similar to `integration_concurrent_writes` @@ -260,7 +260,6 @@ fn add_action(name: &str) -> Action { } async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult { - make_client()?.try_create_lock_table().await?; let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4()); let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned())); let schema = StructType::new(vec![StructField::new(