Skip to content

Commit

Permalink
Merge branch 'main' into release-gil
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored May 15, 2024
2 parents d6b3365 + fae1406 commit ddcda30
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 105 deletions.
70 changes: 47 additions & 23 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! AWS S3 storage backend.

use aws_config::meta::region::ProvideRegion;
use aws_config::provider_config::ProviderConfig;
use aws_config::{Region, SdkConfig};
use bytes::Bytes;
Expand All @@ -20,6 +21,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use tracing::warn;
use url::Url;

use crate::errors::DynamoDbConfigError;
Expand Down Expand Up @@ -67,9 +69,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
&self,
url: &Url,
options: &StorageOptions,
storage_options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let options = self.with_env_s3(options);
let options = self.with_env_s3(storage_options);
let (inner, prefix) = parse_url_opts(
url,
options.0.iter().filter_map(|(key, value)| {
Expand All @@ -87,7 +89,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
{
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;
let s3_options = S3StorageOptions::from_map(&storage_options.0)?;

let store = S3StorageBackend::try_new(
store,
Expand Down Expand Up @@ -140,7 +142,6 @@ impl S3StorageOptions {
.filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str()))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();

// Copy web identity values provided in options but not the environment into the environment
// to get picked up by the `from_k8s_env` call in `get_web_identity_provider`.
Self::ensure_env_var(options, s3_constants::AWS_REGION);
Expand Down Expand Up @@ -175,41 +176,47 @@ impl S3StorageOptions {
.unwrap_or(true);
let imds_timeout =
Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100);
let (loader, provider_config) =
if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) {
let (region_provider, provider_config) = Self::create_provider_config(
str_option(options, s3_constants::AWS_REGION)
.or_else(|| std::env::var("AWS_DEFAULT_REGION").ok())
.map_or(Region::from_static("custom"), Region::new),
)?;
let loader = aws_config::from_env()
.endpoint_url(endpoint_url)
.region(region_provider);
(loader, provider_config)
} else {
let (region_provider, provider_config) = Self::create_provider_config(
crate::credentials::new_region_provider(disable_imds, imds_timeout),
)?;
(
aws_config::from_env().region(region_provider),
provider_config,
)
};

let region_provider = crate::credentials::new_region_provider(disable_imds, imds_timeout);
let region = execute_sdk_future(region_provider.region())?;
let provider_config = ProviderConfig::default().with_region(region);
let credentials_provider = crate::credentials::ConfiguredCredentialChain::new(
disable_imds,
imds_timeout,
&provider_config,
);
#[cfg(feature = "native-tls")]
let sdk_config = execute_sdk_future(
aws_config::from_env()
loader
.http_client(native::use_native_tls_client(
str_option(options, s3_constants::AWS_ALLOW_HTTP)
.map(|val| str_is_truthy(&val))
.unwrap_or(false),
))
.credentials_provider(credentials_provider)
.region(region_provider)
.load(),
)?;
#[cfg(feature = "rustls")]
let sdk_config = execute_sdk_future(
aws_config::from_env()
.credentials_provider(credentials_provider)
.region(region_provider)
.load(),
)?;

let sdk_config =
if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) {
sdk_config.to_builder().endpoint_url(endpoint_url).build()
} else {
sdk_config
};
execute_sdk_future(loader.credentials_provider(credentials_provider).load())?;

Ok(Self {
virtual_hosted_style_request,
locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER),
Expand All @@ -230,6 +237,16 @@ impl S3StorageOptions {
self.sdk_config.region()
}

fn create_provider_config<T: ProvideRegion>(
region_provider: T,
) -> DeltaResult<(T, ProviderConfig)> {
let region = execute_sdk_future(region_provider.region())?;
Ok((
region_provider,
ProviderConfig::default().with_region(region),
))
}

fn u64_or_default(map: &HashMap<String, String>, key: &str, default: u64) -> u64 {
str_option(map, key)
.and_then(|v| v.parse().ok())
Expand Down Expand Up @@ -494,8 +511,15 @@ pub mod s3_constants {
}

pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<String> {
map.get(key)
.map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned()))
if let Some(s) = map.get(key) {
return Some(s.to_owned());
}

if let Some(s) = map.get(&key.to_ascii_lowercase()) {
return Some(s.to_owned());
}

std::env::var(key).ok()
}

#[cfg(test)]
Expand Down
44 changes: 36 additions & 8 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
//! Command for converting a Parquet table to a Delta table in place
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

use crate::operations::get_num_idx_cols_and_stats_columns;
use crate::{
kernel::{Add, DataType, Schema, StructField},
logstore::{LogStore, LogStoreRef},
operations::create::CreateBuilder,
protocol::SaveMode,
table::builder::ensure_table_uri,
table::config::DeltaConfigKey,
writer::stats::stats_from_parquet_metadata,
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
};
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
use futures::{
future::{self, BoxFuture},
TryStreamExt,
};
use indexmap::IndexMap;
use parquet::{
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
errors::ParquetError,
Expand Down Expand Up @@ -284,6 +287,10 @@ impl ConvertToDeltaBuilder {
// A vector of StructField of all unique partition columns in a Parquet table
let mut partition_schema_fields = HashMap::new();

// Obtain settings on which columns to skip collecting stats on if any
let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(None, self.configuration.clone());

for file in files {
// A HashMap from partition column to value for this parquet file only
let mut partition_values = HashMap::new();
Expand Down Expand Up @@ -328,6 +335,24 @@ impl ConvertToDeltaBuilder {
subpath = iter.next();
}

let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file.clone(),
))
.await?;

// Fetch the stats
let parquet_metadata = batch_builder.metadata();
let stats = stats_from_parquet_metadata(
&IndexMap::from_iter(partition_values.clone().into_iter()),
parquet_metadata.as_ref(),
num_indexed_cols,
&stats_columns,
)
.map_err(|e| Error::DeltaTable(e.into()))?;
let stats_string =
serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?;

actions.push(
Add {
path: percent_decode_str(file.location.as_ref())
Expand All @@ -349,19 +374,13 @@ impl ConvertToDeltaBuilder {
.collect(),
modification_time: file.last_modified.timestamp_millis(),
data_change: true,
stats: Some(stats_string),
..Default::default()
}
.into(),
);

let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file,
))
.await?
.schema()
.as_ref()
.clone();
let mut arrow_schema = batch_builder.schema().as_ref().clone();

// Arrow schema of Parquet files may have conflicting metatdata
// Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap
Expand Down Expand Up @@ -584,6 +603,15 @@ mod tests {
"part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet"
);

let Some(Scalar::Struct(min_values, _)) = action.min_values() else {
panic!("Missing min values");
};
assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]);
let Some(Scalar::Struct(max_values, _)) = action.max_values() else {
panic!("Missing max values");
};
assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]);

assert_delta_table(
table,
path,
Expand Down
27 changes: 27 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,33 @@ impl AsRef<DeltaTable> for DeltaOps {
}
}

/// Get the num_idx_columns and stats_columns from the table configuration in the state
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub fn get_num_idx_cols_and_stats_columns(
config: Option<crate::table::config::TableConfig<'_>>,
configuration: HashMap<String, Option<String>>,
) -> (i32, Option<Vec<String>>) {
let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(crate::table::config::DEFAULT_NUM_INDEX_COLS),
configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};
(
num_index_cols,
stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
)
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion::execution::context::SessionState;
Expand Down
9 changes: 1 addition & 8 deletions crates/core/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl<'a> TransactionInfo<'a> {
#[cfg(not(feature = "datafusion"))]
/// Files read by the transaction
pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
Ok(self.read_snapshot.file_actions().unwrap().into_iter())
Ok(self.read_snapshot.file_actions().unwrap())
}

/// Whether the whole table was read during the transaction
Expand Down Expand Up @@ -311,13 +311,6 @@ impl WinningCommitSummary {
}
}

// pub fn only_add_files(&self) -> bool {
// !self
// .actions
// .iter()
// .any(|action| matches!(action, Action::remove(_)))
// }

pub fn is_blind_append(&self) -> Option<bool> {
self.commit_info
.as_ref()
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ impl ProtocolChecker {
}

/// checks if table contains timestamp_ntz in any field including nested fields.
pub fn contains_timestampntz(&self, fields: &Vec<StructField>) -> bool {
fn check_vec_fields(fields: &Vec<StructField>) -> bool {
pub fn contains_timestampntz(&self, fields: &[StructField]) -> bool {
fn check_vec_fields(fields: &[StructField]) -> bool {
fields.iter().any(|f| _check_type(f.data_type()))
}

Expand Down
30 changes: 1 addition & 29 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::logstore::LogStoreRef;
use crate::operations::cast::{cast_record_batch, merge_schema};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::storage::ObjectStoreRef;
use crate::table::config::DEFAULT_NUM_INDEX_COLS;
use crate::table::state::DeltaTableState;
use crate::table::Constraint as DeltaConstraint;
use crate::writer::record_batch::divide_by_partition_values;
Expand Down Expand Up @@ -759,7 +758,7 @@ impl std::future::IntoFuture for WriteBuilder {
.map(|snapshot| snapshot.table_config());

let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(config, this.configuration);
super::get_num_idx_cols_and_stats_columns(config, this.configuration);

let writer_stats_config = WriterStatsConfig {
num_indexed_cols,
Expand Down Expand Up @@ -922,33 +921,6 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE
Ok(())
}

/// Get the num_idx_columns and stats_columns from the table configuration in the state
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
pub fn get_num_idx_cols_and_stats_columns(
config: Option<crate::table::config::TableConfig<'_>>,
configuration: HashMap<String, Option<String>>,
) -> (i32, Option<Vec<String>>) {
let (num_index_cols, stats_columns) = match &config {
Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()),
_ => (
configuration
.get("delta.dataSkippingNumIndexedCols")
.and_then(|v| v.clone().map(|v| v.parse::<i32>().unwrap()))
.unwrap_or(DEFAULT_NUM_INDEX_COLS),
configuration
.get("delta.dataSkippingStatsColumns")
.and_then(|v| v.as_ref().map(|v| v.split(',').collect::<Vec<&str>>())),
),
};
(
num_index_cols,
stats_columns
.clone()
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ mod tests {
use crate::DeltaOps;

let table = crate::writer::test_utils::create_bare_table();
let partition_cols = vec!["modified".to_string()];
let partition_cols = ["modified".to_string()];
let delta_schema = r#"
{"type" : "struct",
"fields" : [
Expand Down
Loading

0 comments on commit ddcda30

Please sign in to comment.