Skip to content

Commit

Permalink
Support parquet_metadata for datafusion-cli (#8413)
Browse files Browse the repository at this point in the history
* Support parquet_metadata for datafusion-cli

Signed-off-by: veeupup <[email protected]>

* make tomlfmt happy

* display like duckdb

Signed-off-by: veeupup <[email protected]>

* add test & fix single quote

---------

Signed-off-by: veeupup <[email protected]>
  • Loading branch information
Veeupup authored Dec 7, 2023
1 parent 5e8b0e0 commit 6767ea3
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 2 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
parquet = { version = "49.0.0", default-features = false }
regex = "1.8"
rustyline = "11.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
Expand Down
223 changes: 221 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,26 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -196,3 +210,208 @@ pub fn display_all_functions() -> Result<()> {
println!("{}", pretty_format_batches(&[batch]).unwrap());
Ok(())
}

/// PARQUET_META table function
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for ParquetMetadataTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?))
}
}

pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.get(0) {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
);
}
};

let file = File::open(filename.clone())?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

let schema = Arc::new(Schema::new(vec![
Field::new("filename", DataType::Utf8, true),
Field::new("row_group_id", DataType::Int64, true),
Field::new("row_group_num_rows", DataType::Int64, true),
Field::new("row_group_num_columns", DataType::Int64, true),
Field::new("row_group_bytes", DataType::Int64, true),
Field::new("column_id", DataType::Int64, true),
Field::new("file_offset", DataType::Int64, true),
Field::new("num_values", DataType::Int64, true),
Field::new("path_in_schema", DataType::Utf8, true),
Field::new("type", DataType::Utf8, true),
Field::new("stats_min", DataType::Utf8, true),
Field::new("stats_max", DataType::Utf8, true),
Field::new("stats_null_count", DataType::Int64, true),
Field::new("stats_distinct_count", DataType::Int64, true),
Field::new("stats_min_value", DataType::Utf8, true),
Field::new("stats_max_value", DataType::Utf8, true),
Field::new("compression", DataType::Utf8, true),
Field::new("encodings", DataType::Utf8, true),
Field::new("index_page_offset", DataType::Int64, true),
Field::new("dictionary_page_offset", DataType::Int64, true),
Field::new("data_page_offset", DataType::Int64, true),
Field::new("total_compressed_size", DataType::Int64, true),
Field::new("total_uncompressed_size", DataType::Int64, true),
]));

// construct recordbatch from metadata
let mut filename_arr = vec![];
let mut row_group_id_arr = vec![];
let mut row_group_num_rows_arr = vec![];
let mut row_group_num_columns_arr = vec![];
let mut row_group_bytes_arr = vec![];
let mut column_id_arr = vec![];
let mut file_offset_arr = vec![];
let mut num_values_arr = vec![];
let mut path_in_schema_arr = vec![];
let mut type_arr = vec![];
let mut stats_min_arr = vec![];
let mut stats_max_arr = vec![];
let mut stats_null_count_arr = vec![];
let mut stats_distinct_count_arr = vec![];
let mut stats_min_value_arr = vec![];
let mut stats_max_value_arr = vec![];
let mut compression_arr = vec![];
let mut encodings_arr = vec![];
let mut index_page_offset_arr = vec![];
let mut dictionary_page_offset_arr = vec![];
let mut data_page_offset_arr = vec![];
let mut total_compressed_size_arr = vec![];
let mut total_uncompressed_size_arr = vec![];
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
for (col_idx, column) in row_group.columns().iter().enumerate() {
filename_arr.push(filename.clone());
row_group_id_arr.push(rg_idx as i64);
row_group_num_rows_arr.push(row_group.num_rows());
row_group_num_columns_arr.push(row_group.num_columns() as i64);
row_group_bytes_arr.push(row_group.total_byte_size());
column_id_arr.push(col_idx as i64);
file_offset_arr.push(column.file_offset());
num_values_arr.push(column.num_values());
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
if let Some(s) = column.statistics() {
let (min_val, max_val) = if s.has_min_max_set() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
};
(Some(min_val), Some(max_val))
} else {
(None, None)
};
stats_min_arr.push(min_val.clone());
stats_max_arr.push(max_val.clone());
stats_null_count_arr.push(Some(s.null_count() as i64));
stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64));
stats_min_value_arr.push(min_val);
stats_max_value_arr.push(max_val);
} else {
stats_min_arr.push(None);
stats_max_arr.push(None);
stats_null_count_arr.push(None);
stats_distinct_count_arr.push(None);
stats_min_value_arr.push(None);
stats_max_value_arr.push(None);
};
compression_arr.push(format!("{:?}", column.compression()));
encodings_arr.push(format!("{:?}", column.encodings()));
index_page_offset_arr.push(column.index_page_offset());
dictionary_page_offset_arr.push(column.dictionary_page_offset());
data_page_offset_arr.push(column.data_page_offset());
total_compressed_size_arr.push(column.compressed_size());
total_uncompressed_size_arr.push(column.uncompressed_size());
}
}

let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(filename_arr)),
Arc::new(Int64Array::from(row_group_id_arr)),
Arc::new(Int64Array::from(row_group_num_rows_arr)),
Arc::new(Int64Array::from(row_group_num_columns_arr)),
Arc::new(Int64Array::from(row_group_bytes_arr)),
Arc::new(Int64Array::from(column_id_arr)),
Arc::new(Int64Array::from(file_offset_arr)),
Arc::new(Int64Array::from(num_values_arr)),
Arc::new(StringArray::from(path_in_schema_arr)),
Arc::new(StringArray::from(type_arr)),
Arc::new(StringArray::from(stats_min_arr)),
Arc::new(StringArray::from(stats_max_arr)),
Arc::new(Int64Array::from(stats_null_count_arr)),
Arc::new(Int64Array::from(stats_distinct_count_arr)),
Arc::new(StringArray::from(stats_min_value_arr)),
Arc::new(StringArray::from(stats_max_value_arr)),
Arc::new(StringArray::from(compression_arr)),
Arc::new(StringArray::from(encodings_arr)),
Arc::new(Int64Array::from(index_page_offset_arr)),
Arc::new(Int64Array::from(dictionary_page_offset_arr)),
Arc::new(Int64Array::from(data_page_offset_arr)),
Arc::new(Int64Array::from(total_compressed_size_arr)),
Arc::new(Int64Array::from(total_uncompressed_size_arr)),
],
)?;

let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
Ok(Arc::new(parquet_metadata))
}
}
35 changes: 35 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::{
exec,
print_format::PrintFormat,
Expand Down Expand Up @@ -185,6 +186,8 @@ pub async fn main() -> Result<()> {
ctx.state().catalog_list(),
ctx.state_weak_ref(),
)));
// register `parquet_metadata` table function to get metadata from parquet files
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

let mut print_options = PrintOptions {
format: args.format,
Expand Down Expand Up @@ -328,6 +331,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use datafusion::assert_batches_eq;

use super::*;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down Expand Up @@ -385,4 +390,34 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_parquet_metadata_works() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

// input with single quote
let sql =
"SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;

let excepted = [
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | \"f0.list.item\" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [RLE_DICTIONARY, PLAIN, RLE] | | 4 | 46 | 121 | 123 |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
];
assert_batches_eq!(excepted, &rbs);

// input with double quote
let sql =
"SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_batches_eq!(excepted, &rbs);

Ok(())
}
}

0 comments on commit 6767ea3

Please sign in to comment.