Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parquet_metadata for datafusion-cli #8413

Merged
merged 4 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
parquet = { version = "49.0.0", default-features = false }
regex = "1.8"
rustyline = "11.0"
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
Expand Down
223 changes: 221 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,26 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -196,3 +210,208 @@ pub fn display_all_functions() -> Result<()> {
println!("{}", pretty_format_batches(&[batch]).unwrap());
Ok(())
}

/// PARQUET_META table function
struct ParquetMetadataTable {
schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for ParquetMetadataTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MemoryExec::try_new(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?))
}
}

pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.get(0) {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
);
}
};

let file = File::open(filename.clone())?;
let reader = SerializedFileReader::new(file)?;
Veeupup marked this conversation as resolved.
Show resolved Hide resolved
let metadata = reader.metadata();

let schema = Arc::new(Schema::new(vec![
Field::new("filename", DataType::Utf8, true),
Field::new("row_group_id", DataType::Int64, true),
Field::new("row_group_num_rows", DataType::Int64, true),
Field::new("row_group_num_columns", DataType::Int64, true),
Field::new("row_group_bytes", DataType::Int64, true),
Field::new("column_id", DataType::Int64, true),
Field::new("file_offset", DataType::Int64, true),
Field::new("num_values", DataType::Int64, true),
Field::new("path_in_schema", DataType::Utf8, true),
Field::new("type", DataType::Utf8, true),
Field::new("stats_min", DataType::Utf8, true),
Field::new("stats_max", DataType::Utf8, true),
Field::new("stats_null_count", DataType::Int64, true),
Field::new("stats_distinct_count", DataType::Int64, true),
Field::new("stats_min_value", DataType::Utf8, true),
Field::new("stats_max_value", DataType::Utf8, true),
Field::new("compression", DataType::Utf8, true),
Field::new("encodings", DataType::Utf8, true),
Field::new("index_page_offset", DataType::Int64, true),
Field::new("dictionary_page_offset", DataType::Int64, true),
Field::new("data_page_offset", DataType::Int64, true),
Field::new("total_compressed_size", DataType::Int64, true),
Field::new("total_uncompressed_size", DataType::Int64, true),
]));

// construct recordbatch from metadata
let mut filename_arr = vec![];
let mut row_group_id_arr = vec![];
let mut row_group_num_rows_arr = vec![];
let mut row_group_num_columns_arr = vec![];
let mut row_group_bytes_arr = vec![];
let mut column_id_arr = vec![];
let mut file_offset_arr = vec![];
let mut num_values_arr = vec![];
let mut path_in_schema_arr = vec![];
let mut type_arr = vec![];
let mut stats_min_arr = vec![];
let mut stats_max_arr = vec![];
let mut stats_null_count_arr = vec![];
let mut stats_distinct_count_arr = vec![];
let mut stats_min_value_arr = vec![];
let mut stats_max_value_arr = vec![];
let mut compression_arr = vec![];
let mut encodings_arr = vec![];
let mut index_page_offset_arr = vec![];
let mut dictionary_page_offset_arr = vec![];
let mut data_page_offset_arr = vec![];
let mut total_compressed_size_arr = vec![];
let mut total_uncompressed_size_arr = vec![];
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
for (col_idx, column) in row_group.columns().iter().enumerate() {
filename_arr.push(filename.clone());
row_group_id_arr.push(rg_idx as i64);
row_group_num_rows_arr.push(row_group.num_rows());
row_group_num_columns_arr.push(row_group.num_columns() as i64);
row_group_bytes_arr.push(row_group.total_byte_size());
column_id_arr.push(col_idx as i64);
file_offset_arr.push(column.file_offset());
num_values_arr.push(column.num_values());
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
if let Some(s) = column.statistics() {
Veeupup marked this conversation as resolved.
Show resolved Hide resolved
let (min_val, max_val) = if s.has_min_max_set() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
};
(Some(min_val), Some(max_val))
} else {
(None, None)
};
stats_min_arr.push(min_val.clone());
stats_max_arr.push(max_val.clone());
stats_null_count_arr.push(Some(s.null_count() as i64));
stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64));
stats_min_value_arr.push(min_val);
stats_max_value_arr.push(max_val);
} else {
stats_min_arr.push(None);
stats_max_arr.push(None);
stats_null_count_arr.push(None);
stats_distinct_count_arr.push(None);
stats_min_value_arr.push(None);
stats_max_value_arr.push(None);
};
compression_arr.push(format!("{:?}", column.compression()));
encodings_arr.push(format!("{:?}", column.encodings()));
index_page_offset_arr.push(column.index_page_offset());
dictionary_page_offset_arr.push(column.dictionary_page_offset());
data_page_offset_arr.push(column.data_page_offset());
total_compressed_size_arr.push(column.compressed_size());
total_uncompressed_size_arr.push(column.uncompressed_size());
}
}

let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(filename_arr)),
Arc::new(Int64Array::from(row_group_id_arr)),
Arc::new(Int64Array::from(row_group_num_rows_arr)),
Arc::new(Int64Array::from(row_group_num_columns_arr)),
Arc::new(Int64Array::from(row_group_bytes_arr)),
Arc::new(Int64Array::from(column_id_arr)),
Arc::new(Int64Array::from(file_offset_arr)),
Arc::new(Int64Array::from(num_values_arr)),
Arc::new(StringArray::from(path_in_schema_arr)),
Arc::new(StringArray::from(type_arr)),
Arc::new(StringArray::from(stats_min_arr)),
Arc::new(StringArray::from(stats_max_arr)),
Arc::new(Int64Array::from(stats_null_count_arr)),
Arc::new(Int64Array::from(stats_distinct_count_arr)),
Arc::new(StringArray::from(stats_min_value_arr)),
Arc::new(StringArray::from(stats_max_value_arr)),
Arc::new(StringArray::from(compression_arr)),
Arc::new(StringArray::from(encodings_arr)),
Arc::new(Int64Array::from(index_page_offset_arr)),
Arc::new(Int64Array::from(dictionary_page_offset_arr)),
Arc::new(Int64Array::from(data_page_offset_arr)),
Arc::new(Int64Array::from(total_compressed_size_arr)),
Arc::new(Int64Array::from(total_uncompressed_size_arr)),
],
)?;

let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
Ok(Arc::new(parquet_metadata))
}
}
35 changes: 35 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
use datafusion_cli::{
exec,
print_format::PrintFormat,
Expand Down Expand Up @@ -185,6 +186,8 @@ pub async fn main() -> Result<()> {
ctx.state().catalog_list(),
ctx.state_weak_ref(),
)));
// register `parquet_metadata` table function to get metadata from parquet files
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳


let mut print_options = PrintOptions {
format: args.format,
Expand Down Expand Up @@ -328,6 +331,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {

#[cfg(test)]
mod tests {
use datafusion::assert_batches_eq;

use super::*;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down Expand Up @@ -385,4 +390,34 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_parquet_metadata_works() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));

// input with single quote
let sql =
"SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;

let excepted = [
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
"| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | \"f0.list.item\" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [RLE_DICTIONARY, PLAIN, RLE] | | 4 | 46 | 121 | 123 |",
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
];
assert_batches_eq!(excepted, &rbs);

// input with double quote
let sql =
"SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_batches_eq!(excepted, &rbs);

Ok(())
}
}