Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable S3 integration tests to be configured via environment vars #1966

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions crates/deltalake-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use fs_extra::dir::{copy, CopyOptions};
use object_store::DynObjectStore;
use rand::Rng;
use serde_json::json;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tempdir::TempDir;
Expand All @@ -18,6 +19,8 @@ pub struct IntegrationContext {
bucket: String,
store: Arc<DynObjectStore>,
tmp_dir: TempDir,
/// environment variables valid before `prepare_env()` modified them
env_vars: HashMap<String, String>,
}

impl IntegrationContext {
Expand All @@ -29,6 +32,9 @@ impl IntegrationContext {
#[cfg(test)]
dotenvy::dotenv().ok();

// save existing environment variables
let env_vars = std::env::vars().collect();

integration.prepare_env();

let tmp_dir = TempDir::new("")?;
Expand Down Expand Up @@ -97,6 +103,7 @@ impl IntegrationContext {
bucket,
store,
tmp_dir,
env_vars,
})
}

Expand Down Expand Up @@ -162,6 +169,18 @@ impl IntegrationContext {
};
Ok(())
}

fn restore_env(&self) {
let env_vars: HashMap<_, _> = std::env::vars().collect();
for (key, _) in env_vars {
if !self.env_vars.contains_key(&key) {
std::env::remove_var(key)
}
}
for (key, value) in self.env_vars.iter() {
std::env::set_var(key, value);
}
}
}

impl Drop for IntegrationContext {
Expand All @@ -184,6 +203,7 @@ impl Drop for IntegrationContext {
hdfs_cli::delete_dir(&self.bucket).unwrap();
}
};
self.restore_env();
}
}

Expand Down Expand Up @@ -395,38 +415,17 @@ pub mod s3_cli {

/// Create a new bucket
pub fn create_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let region = std::env::var(s3_storage_options::AWS_REGION)
.expect("variable AWS_REGION must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"mb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--region",
&region,
])
.args(["s3", "mb", bucket_name.as_ref()])
.spawn()
.expect("aws command is installed");
child.wait()
}

/// delete bucket
pub fn delete_bucket(bucket_name: impl AsRef<str>) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"rb",
bucket_name.as_ref(),
"--endpoint-url",
&endpoint,
"--force",
])
.args(["s3", "rb", bucket_name.as_ref(), "--force"])
.spawn()
.expect("aws command is installed");
child.wait()
Expand All @@ -437,16 +436,12 @@ pub mod s3_cli {
source: impl AsRef<str>,
destination: impl AsRef<str>,
) -> std::io::Result<ExitStatus> {
let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable ENDPOINT must be set to connect to S3");
let mut child = Command::new("aws")
.args([
"s3",
"cp",
source.as_ref(),
destination.as_ref(),
"--endpoint-url",
&endpoint,
"--recursive",
])
.spawn()
Expand All @@ -456,13 +451,18 @@ pub mod s3_cli {

/// prepare_env
pub fn prepare_env() {
set_env_if_not_set(
s3_storage_options::AWS_ENDPOINT_URL,
"http://localhost:4566",
);
match std::env::var(s3_storage_options::AWS_ENDPOINT_URL).ok() {
Some(endpoint_url) if endpoint_url.to_lowercase() == "none" => {
std::env::remove_var(s3_storage_options::AWS_ENDPOINT_URL)
}
Some(_) => (),
None => std::env::set_var(
s3_storage_options::AWS_ENDPOINT_URL,
"http://localhost:4566",
),
}
set_env_if_not_set(s3_storage_options::AWS_ACCESS_KEY_ID, "deltalake");
set_env_if_not_set(s3_storage_options::AWS_SECRET_ACCESS_KEY, "weloverust");
set_env_if_not_set("AWS_DEFAULT_REGION", "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_REGION, "us-east-1");
set_env_if_not_set(s3_storage_options::AWS_S3_LOCKING_PROVIDER, "dynamodb");
set_env_if_not_set("DYNAMO_LOCK_TABLE_NAME", "test_table");
Expand All @@ -472,44 +472,58 @@ pub mod s3_cli {

fn create_dynamodb_table(
table_name: &str,
endpoint_url: &str,
attr_definitions: &[&str],
key_schema: &[&str],
) -> std::io::Result<ExitStatus> {
println!("creating table {}", table_name);
let args01 = [
let args = [
"dynamodb",
"create-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint_url,
"--provisioned-throughput",
"ReadCapacityUnits=10,WriteCapacityUnits=10",
"--attribute-definitions",
];
let args: Vec<_> = args01
.iter()
.chain(attr_definitions.iter())
.chain(["--key-schema"].iter())
.chain(key_schema)
.collect();
let mut child = Command::new("aws")
.args(args)
.args(attr_definitions.iter())
.arg("--key-schema")
.args(key_schema)
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
let status = child.wait()?;
wait_for_table(table_name)?;
Ok(status)
}

fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}

fn wait_for_table(table_name: &str) -> std::io::Result<()> {
let args = ["dynamodb", "describe-table", "--table-name", &table_name];
loop {
let output = Command::new("aws")
.args(args)
.output()
.expect("aws command is installed");
if find_subsequence(&output.stdout, "CREATING".as_bytes()).is_some() {
std::thread::sleep(std::time::Duration::from_millis(200));
continue;
} else {
return Ok(());
}
}
}

pub fn create_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
create_dynamodb_table(
&table_name,
&endpoint_url,
&[
"AttributeName=tablePath,AttributeType=S",
"AttributeName=fileName,AttributeType=S",
Expand All @@ -521,28 +535,19 @@ pub mod s3_cli {
)
}

fn delete_dynamodb_table(table_name: &str, endpoint_url: &str) -> std::io::Result<ExitStatus> {
fn delete_dynamodb_table(table_name: &str) -> std::io::Result<ExitStatus> {
let mut child = Command::new("aws")
.args([
"dynamodb",
"delete-table",
"--table-name",
&table_name,
"--endpoint-url",
&endpoint_url,
])
.args(["dynamodb", "delete-table", "--table-name", &table_name])
.stdout(Stdio::null())
.spawn()
.expect("aws command is installed");
child.wait()
}

pub fn delete_lock_table() -> std::io::Result<ExitStatus> {
let endpoint_url = std::env::var(s3_storage_options::AWS_ENDPOINT_URL)
.expect("variable AWS_ENDPOINT_URL must be set to connect to S3 emulator");
let table_name =
std::env::var("DELTA_DYNAMO_TABLE_NAME").unwrap_or_else(|_| "delta_log".into());
delete_dynamodb_table(&table_name, &endpoint_url)
delete_dynamodb_table(&table_name)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/tests/integration_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Duration;
use tokio::time::sleep;

#[tokio::test]
#[serial]
async fn cleanup_metadata_fs_test() -> TestResult {
let context = IntegrationContext::new(StorageIntegration::Local)?;
cleanup_metadata_test(&context).await?;
Expand Down Expand Up @@ -109,6 +110,7 @@ async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult {
}

#[tokio::test]
#[serial]
async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> {
let _ = std::fs::remove_dir_all("./tests/data/issue_1420");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,30 @@ use deltalake_core::operations::DeltaOps;
use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables};
use deltalake_core::{DeltaTable, DeltaTableBuilder};
use serial_test::serial;
use std::collections::HashMap;
use std::future::Future;
use std::iter::FromIterator;
use std::time::Duration;

#[tokio::test]
#[serial]
async fn test_concurrent_writes_local() -> TestResult {
test_concurrent_writes(StorageIntegration::Local).await?;
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
#[serial]
async fn concurrent_writes_s3() -> TestResult {
test_concurrent_writes(StorageIntegration::Amazon).await?;
Ok(())
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_azure() -> TestResult {
test_concurrent_writes(StorageIntegration::Microsoft).await?;
Ok(())
Expand All @@ -35,6 +39,7 @@ async fn test_concurrent_writes_azure() -> TestResult {
#[ignore]
#[cfg(feature = "hdfs")]
#[tokio::test]
#[serial]
async fn test_concurrent_writes_hdfs() -> TestResult {
test_concurrent_writes(StorageIntegration::Hdfs).await?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions crates/deltalake-core/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn test_repair_on_update() -> TestResult<()> {
const WORKERS: i64 = 3;
const COMMITS: i64 = 5;

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_concurrent_writers() -> TestResult<()> {
// Goal: a test with multiple writers, very similar to `integration_concurrent_writes`
Expand Down Expand Up @@ -260,7 +260,6 @@ fn add_action(name: &str) -> Action {
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
make_client()?.try_create_lock_table().await?;
let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));
let schema = StructType::new(vec![StructField::new(
Expand Down
Loading