Skip to content

Commit

Permalink
fix: Enable S3 integration tests to be configured via environment var…
Browse files Browse the repository at this point in the history
…iables
  • Loading branch information
dispanser authored and rtyler committed Dec 15, 2023
1 parent 363b6ea commit be75827
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 61 deletions.
123 changes: 64 additions & 59 deletions crates/deltalake-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use fs_extra::dir::{copy, CopyOptions};
use object_store::DynObjectStore;
use rand::Rng;
use serde_json::json;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tempdir::TempDir;
Expand All @@ -18,6 +19,8 @@ pub struct IntegrationContext {
bucket: String,
store: Arc<DynObjectStore>,
tmp_dir: TempDir,
/// environment variables valid before `prepare_env()` modified them
env_vars: HashMap<String, String>,
}

impl IntegrationContext {
Expand All @@ -29,6 +32,9 @@ impl IntegrationContext {
#[cfg(test)]
dotenvy::dotenv().ok();

// save existing environment variables
let env_vars = std::env::vars().collect();

integration.prepare_env();

let tmp_dir = TempDir::new("")?;
Expand Down Expand Up @@ -97,6 +103,7 @@ impl IntegrationContext {
bucket,
store,
tmp_dir,
env_vars,
})
}

Expand Down Expand Up @@ -162,6 +169,18 @@ impl IntegrationContext {
};
Ok(())
}

fn restore_env(&self) {
let env_vars: HashMap<_, _> = std::env::vars().collect();
for (key, _) in env_vars {
if !self.env_vars.contains_key(&key) {
std::env::remove_var(key)
}
}
for (key, value) in self.env_vars.iter() {
std::env::set_var(key, value);
}
}
}

impl Drop for IntegrationContext {
Expand All @@ -184,6 +203,7 @@ impl Drop for IntegrationContext {
hdfs_cli::delete_dir(&self.bucket).unwrap();
}
};
self.restore_env();
}
}

Expand Down Expand Up @@ -395,38 +415,17 @@ pub mod s3_cli {

/// Create a new bucket
pub fn create_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let region = std::env::var(s3_storage_options::AWS_REGION)
.expect("variable AWS_REGION must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"mb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--region",
&region,
])
.args(["s3", "mb", bucket_name.as_ref()])
.spawn()
.expect("aws command is installed");
child.wait()
}

/// delete bucket
pub fn delete_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"rb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--force",
])
.args(["s3", "rb", bucket_name.as_ref(), "--force"])
.spawn()
.expect("aws command is installed");
child.wait()
Expand All @@ -437,16 +436,12 @@ pub mod s3_cli {
source: impl AsRef<str>,
destination: impl AsRef<str>,
) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"cp",
source.as_ref(),
destination.as_ref(),
"--endpoint-url",
&endpoint,
"--recursive",
])
.spawn()
Expand All @@ -456,13 +451,18 @@ pub mod s3_cli {

/// prepare_env
pub fn prepare_env() {
set_env_if_not_set(
s3_storage_options::AWS_ENDPOINT_URL,
"http://localhost:4566",
);
match std::env::var(s3_storage_options::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
std::env::remove_var(s3_storage_options::AWS_ENDPOINT_URL)
}
Some(_) => (),
None => std::env::set_var(
s3_storage_options::AWS_ENDPOINT_URL,
"http://localhost:4566",
),
}
set_env_if_not_set(s3_storage_options::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_storage_options::AWS_SECRET_ACCESS_KEY, "weloverust");
set_env_if_not_set("AWS_DEFAULT_REGION", "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_REGION, "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_S3_LOCKING_PROVIDER, "dynamodb");
set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table");
Expand All @@ -472,44 +472,58 @@ pub mod s3_cli {

fn create_dynamodb_table(
table_name: &str,
endpoint_url: &str,
attr_definitions: &[&str],
key_schema: &[&str],
) -> std::io::Result<ExitStatus> {
println!("creating table {}", table_name);
let args01 = [
let args = [
"dynamodb",
"create-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint_url,
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
"--attribute-definitions",
];
let args: Vec<_> = args01
.iter()
.chain(attr_definitions.iter())
.chain(["--key-schema"].iter())
.chain(key_schema)
.collect();
let mut child = Command::new("aws")
.args(args)
.args(attr_definitions.iter())
.arg("--key-schema")
.args(key_schema)
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
let status = child.wait()?;
wait_for_table(table_name)?;
Ok(status)
}

fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}

fn wait_for_table(table_name: &str) -> std::io::Result<()> {
let args = ["dynamodb", "describe-table", "--table-name", &table_name];
loop {
let output = Command::new("aws")
.args(args)
.output()
.expect("aws command is installed");
if find_subsequence(&output.stdout, "CREATING".as_bytes()).is_some() {
std::thread::sleep(std::time::Duration::from_millis(200));
continue;
} else {
return Ok(());
}
}
}

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
create_dynamodb_table(
&table_name,
&endpoint_url,
&[
"AttributeName=tablePath,AttributeType=S",
"AttributeName=fileName,AttributeType=S",
Expand All @@ -521,28 +535,19 @@ pub mod s3_cli {
)
}

fn delete_dynamodb_table(table_name: &str, endpoint_url: &str) -> std::io::Result<ExitStatus> {
fn delete_dynamodb_table(table_name: &str) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args([
"dynamodb",
"delete-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint_url,
])
.args(["dynamodb", "delete-table", "--table-name", &table_name])
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
}

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
delete_dynamodb_table(&table_name, &endpoint_url)
delete_dynamodb_table(&table_name)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/tests/integration_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Duration;
use tokio::time::sleep;

#[tokio::test]
#[serial]
async fn cleanup_metadata_fs_test() -> TestResult {
let context = IntegrationContext::new(StorageIntegration::Local)?;
cleanup_metadata_test(&context).await?;
Expand Down Expand Up @@ -109,6 +110,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult {
}

#[tokio::test]
#[serial]
async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> {
let _ = std::fs::remove_dir_all("./tests/data/issue_1420");

Expand Down
5 changes: 5 additions & 0 deletions crates/deltalake-core/tests/integration_concurrent_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,30 @@ use deltalake_core::operations::DeltaOps;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake_core::{DeltaTable, DeltaTableBuilder};
use serial_test::serial;
use std::collections::HashMap;
use std::future::Future;
use std::iter::FromIterator;
use std::time::Duration;

#[tokio::test]
#[serial]
async fn test_concurrent_writes_local() -> TestResult {
test_concurrent_writes(StorageIntegration::Local).await?;
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
#[serial]
async fn concurrent_writes_s3() -> TestResult {
test_concurrent_writes(StorageIntegration::Amazon).await?;
Ok(())
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_azure() -> TestResult {
test_concurrent_writes(StorageIntegration::Microsoft).await?;
Ok(())
Expand All @@ -35,6 +39,7 @@ async fn test_concurrent_writes_azure() -> TestResult {
#[ignore]
#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_hdfs() -> TestResult {
test_concurrent_writes(StorageIntegration::Hdfs).await?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions crates/deltalake-core/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn test_repair_on_update() -> TestResult<()> {
const WORKERS: i64 = 3;
const COMMITS: i64 = 5;

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_concurrent_writers() -> TestResult<()> {
// Goal: a test with multiple writers, very similar to `integration_concurrent_writes`
Expand Down Expand Up @@ -260,7 +260,6 @@ fn add_action(name: &str) -> Action {
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
make_client()?.try_create_lock_table().await?;
let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));
let schema = StructType::new(vec![StructField::new(
Expand Down

0 comments on commit be75827

Please sign in to comment.