Skip to content

Commit

Permalink
merge prep-34-rc3
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 14, 2023
2 parents 1a02d14 + e6e84e1 commit 2693384
Show file tree
Hide file tree
Showing 45 changed files with 1,261 additions and 371 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ jobs:
- name: Check workspace without default features
run: cargo check --no-default-features -p datafusion

- name: Check datafusion-common without default features
run: cargo check --tests --no-default-features -p datafusion-common

- name: Check workspace in debug mode
run: cargo check

Expand Down
77 changes: 51 additions & 26 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use parquet::basic::ConvertedType;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
Expand Down Expand Up @@ -246,6 +247,52 @@ impl TableProvider for ParquetMetadataTable {
}
}

fn convert_parquet_statistics(
value: &Statistics,
converted_type: ConvertedType,
) -> (String, String) {
match (value, converted_type) {
(Statistics::Boolean(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::Int32(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::Int64(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::Int96(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::Float(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::Double(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::ByteArray(val), ConvertedType::UTF8) => {
let min_bytes = val.min();
let max_bytes = val.max();
let min = min_bytes
.as_utf8()
.map(|v| v.to_string())
.unwrap_or_else(|_| min_bytes.to_string());

let max = max_bytes
.as_utf8()
.map(|v| v.to_string())
.unwrap_or_else(|_| max_bytes.to_string());
(min, max)
}
(Statistics::ByteArray(val), _) => (val.min().to_string(), val.max().to_string()),
(Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => {
let min_bytes = val.min();
let max_bytes = val.max();
let min = min_bytes
.as_utf8()
.map(|v| v.to_string())
.unwrap_or_else(|_| min_bytes.to_string());

let max = max_bytes
.as_utf8()
.map(|v| v.to_string())
.unwrap_or_else(|_| max_bytes.to_string());
(min, max)
}
(Statistics::FixedLenByteArray(val), _) => {
(val.min().to_string(), val.max().to_string())
}
}
}

pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
Expand Down Expand Up @@ -326,34 +373,12 @@ impl TableFunctionImpl for ParquetMetadataFunc {
num_values_arr.push(column.num_values());
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
let converted_type = column.column_descr().converted_type();

if let Some(s) = column.statistics() {
let (min_val, max_val) = if s.has_min_max_set() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
};
let (min_val, max_val) =
convert_parquet_statistics(s, converted_type);
(Some(min_val), Some(max_val))
} else {
(None, None)
Expand Down
24 changes: 24 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,28 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_parquet_metadata_works_with_strings() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

// input with string columns
let sql =
"SELECT * FROM parquet_metadata('../parquet-testing/data/data_index_bloom_encoding_stats.parquet')";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;

let excepted = [

"+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |",
"+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| ../parquet-testing/data/data_index_bloom_encoding_stats.parquet | 0 | 14 | 1 | 163 | 0 | 4 | 14 | \"String\" | BYTE_ARRAY | Hello | today | 0 | | Hello | today | GZIP(GzipLevel(6)) | [BIT_PACKED, RLE, PLAIN] | | | 4 | 152 | 163 |",
"+-----------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+------------+-----------+-----------+------------------+----------------------+-----------------+-----------------+--------------------+--------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+"
];
assert_batches_eq!(excepted, &rbs);

Ok(())
}
}
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mimalloc = { version = "0.1", default-features = false }
num_cpus = { workspace = true }
object_store = { workspace = true, features = ["aws", "http"] }
prost = { version = "0.12", default-features = false }
prost-derive = { version = "0.11", default-features = false }
prost-derive = { version = "0.12", default-features = false }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ make_error!(not_impl_err, not_impl_datafusion_err, NotImplemented);
// Exposes a macro to create `DataFusionError::Execution`
make_error!(exec_err, exec_datafusion_err, Execution);

// Exposes a macro to create `DataFusionError::Substrait`
make_error!(substrait_err, substrait_datafusion_err, Substrait);

// Exposes a macro to create `DataFusionError::SQL`
#[macro_export]
macro_rules! sql_err {
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/file_options/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ mod tests {
use std::str::FromStr;

#[test]
#[cfg(feature = "parquet")]
fn from_str() {
for (ext, file_type) in [
("csv", FileType::CSV),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ impl Display for FileTypeWriterOptions {
mod tests {
use std::collections::HashMap;

#[cfg(feature = "parquet")]
use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
Expand All @@ -313,9 +314,11 @@ mod tests {

use crate::Result;

#[cfg(feature = "parquet")]
use super::{parquet_writer::ParquetWriterOptions, StatementOptions};

#[test]
#[cfg(feature = "parquet")]
fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("max_row_group_size".to_owned(), "123".to_owned());
Expand Down Expand Up @@ -386,6 +389,7 @@ mod tests {
}

#[test]
#[cfg(feature = "parquet")]
fn test_writeroptions_parquet_column_specific() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();

Expand Down Expand Up @@ -506,6 +510,8 @@ mod tests {
}

#[test]
// for StatementOptions
#[cfg(feature = "parquet")]
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("header".to_owned(), "true".to_owned());
Expand Down Expand Up @@ -533,6 +539,8 @@ mod tests {
}

#[test]
// for StatementOptions
#[cfg(feature = "parquet")]
fn test_writeroptions_json_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("compression".to_owned(), "gzip".to_owned());
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ mod tests {
}

#[test]
#[cfg(feature = "parquet")]
fn test_happy() {
let res = arrow_test_data();
assert!(PathBuf::from(res).is_dir());
Expand Down
35 changes: 26 additions & 9 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,16 @@ impl DataFrame {
))
}

/// Write this DataFrame to the referenced table
/// Write this DataFrame to the referenced table by name.
/// This method uses on the same underlying implementation
/// as the SQL Insert Into statement.
/// Unlike most other DataFrame methods, this method executes
/// eagerly, writing data, and returning the count of rows written.
/// as the SQL Insert Into statement. Unlike most other DataFrame methods,
/// this method executes eagerly. Data is written to the table using an
/// execution plan returned by the [TableProvider]'s insert_into method.
/// Refer to the documentation of the specific [TableProvider] to determine
/// the expected data returned by the insert_into plan via this method.
/// For the built in ListingTable provider, a single [RecordBatch] containing
/// a single column and row representing the count of total rows written
/// is returned.
pub async fn write_table(
self,
table_name: &str,
Expand Down Expand Up @@ -1271,11 +1276,12 @@ impl DataFrame {
/// ```
pub async fn cache(self) -> Result<DataFrame> {
let context = SessionContext::new_with_state(self.session_state.clone());
let mem_table = MemTable::try_new(
SchemaRef::from(self.schema().clone()),
self.collect_partitioned().await?,
)?;

// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let schema = plan.schema();
let task_ctx = Arc::new(self.task_ctx());
let partitions = collect_partitioned(plan, task_ctx).await?;
let mem_table = MemTable::try_new(schema, partitions)?;
context.read_table(Arc::new(mem_table))
}
}
Expand Down Expand Up @@ -2633,6 +2639,17 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_cache_mismatch() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.sql("SELECT CASE WHEN true THEN NULL ELSE 1 END")
.await?;
let cache_df = df.cache().await;
assert!(cache_df.is_ok());
Ok(())
}

#[tokio::test]
async fn cache_test() -> Result<()> {
let df = test_table()
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ const CONCURRENCY_LIMIT: usize = 100;

/// Partition the list of files into `n` groups
pub fn split_files(
partitioned_files: Vec<PartitionedFile>,
mut partitioned_files: Vec<PartitionedFile>,
n: usize,
) -> Vec<Vec<PartitionedFile>> {
if partitioned_files.is_empty() {
return vec![];
}

// ObjectStore::list does not guarantee any consistent order and for some
// implementations such as LocalFileSystem, it may be inconsistent. Thus
// Sort files by path to ensure consistent plans when run more than once.
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ impl PartitionedFile {
let size = std::fs::metadata(path.clone())?.len();
Ok(Self::new(path, size))
}

/// Return the path of this partitioned file
pub fn path(&self) -> &Path {
&self.object_meta.location
}
}

impl From<ObjectMeta> for PartitionedFile {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ListingTableUrl {
/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
#[deprecated(note = "Use parse")]
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
Expand All @@ -131,6 +132,10 @@ impl ListingTableUrl {
if is_directory {
fs::create_dir_all(path)?;
} else {
// ensure parent directory exists
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::File::create(path)?;
}
}
Expand Down
17 changes: 5 additions & 12 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,18 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);

// Backwards compatibility
// Backwards compatibility (#8547)
if let Some(s) = statement_options.take_str_option("insert_mode") {
if !s.eq_ignore_ascii_case("append_new_files") {
return plan_err!("Unknown or unsupported insert mode {s}. Only append_to_file supported");
return plan_err!("Unknown or unsupported insert mode {s}. Only append_new_files supported");
}
}
statement_options.take_bool_option("create_local_path")?;

let file_type = file_format.file_type();

// Use remaining options and session state to build FileTypeWriterOptions
Expand Down Expand Up @@ -199,13 +198,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;

let file_schema = builder.schema().clone();

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(builder.schema())?;
schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;

let mask = ProjectionMask::roots(
Expand All @@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
builder.schema().as_ref(),
table_schema.as_ref(),
&file_schema,
&table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
Expand All @@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
&file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
Expand Down
Loading

0 comments on commit 2693384

Please sign in to comment.