Skip to content

Commit

Permalink
display like duckdb
Browse files Browse the repository at this point in the history
Signed-off-by: veeupup <[email protected]>
  • Loading branch information
Veeupup committed Dec 6, 2023
1 parent b367b9e commit 5516189
Showing 1 changed file with 134 additions and 47 deletions.
181 changes: 134 additions & 47 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

//! Functions that are query-able and searchable via the `\h` command
use arrow::array::Int32Array;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
Expand All @@ -33,6 +32,7 @@ use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
Expand Down Expand Up @@ -250,65 +250,152 @@ pub struct ParquetMetadataFunc {}
impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else {
return plan_err!("read_csv requires at least one string argument");
return plan_err!("parquet_metadata requires string argument as its input");
};

let file = File::open(name)?;
let file = File::open(name.clone())?;
let reader = SerializedFileReader::new(file)?;
let metadata = reader.metadata();

let schema = Arc::new(Schema::new(vec![
Field::new("version", DataType::Int32, false),
Field::new("num_rows", DataType::Int64, false),
Field::new("created_by", DataType::Utf8, false),
Field::new("columns_order", DataType::Utf8, false),
Field::new("num_row_groups", DataType::Int64, false),
Field::new("row_groups", DataType::Utf8, false),
Field::new("filename", DataType::Utf8, true),
Field::new("row_group_id", DataType::Int64, true),
Field::new("row_group_num_rows", DataType::Int64, true),
Field::new("row_group_num_columns", DataType::Int64, true),
Field::new("row_group_bytes", DataType::Int64, true),
Field::new("column_id", DataType::Int64, true),
Field::new("file_offset", DataType::Int64, true),
Field::new("num_values", DataType::Int64, true),
Field::new("path_in_schema", DataType::Utf8, true),
Field::new("type", DataType::Utf8, true),
Field::new("stats_min", DataType::Utf8, true),
Field::new("stats_max", DataType::Utf8, true),
Field::new("stats_null_count", DataType::Int64, true),
Field::new("stats_distinct_count", DataType::Int64, true),
Field::new("stats_min_value", DataType::Utf8, true),
Field::new("stats_max_value", DataType::Utf8, true),
Field::new("compression", DataType::Utf8, true),
Field::new("encodings", DataType::Utf8, true),
Field::new("index_page_offset", DataType::Int64, true),
Field::new("dictionary_page_offset", DataType::Int64, true),
Field::new("data_page_offset", DataType::Int64, true),
Field::new("total_compressed_size", DataType::Int64, true),
Field::new("total_uncompressed_size", DataType::Int64, true),
]));

// construct recordbatch from metadata
let num_groups = metadata.num_row_groups();
let row_groups = metadata
.row_groups()
.iter()
.map(|rg| {
format!(
"num_columns: {}, num_rows: {}, total_byte_size: {}, sorting_columns: {:?}",
rg.num_columns(),
rg.num_rows(),
rg.total_byte_size(),
rg.sorting_columns()
)
})
.collect::<Vec<String>>();
let mut filename_arr = vec![];
let mut row_group_id_arr = vec![];
let mut row_group_num_rows_arr = vec![];
let mut row_group_num_columns_arr = vec![];
let mut row_group_bytes_arr = vec![];
let mut column_id_arr = vec![];
let mut file_offset_arr = vec![];
let mut num_values_arr = vec![];
let mut path_in_schema_arr = vec![];
let mut type_arr = vec![];
let mut stats_min_arr = vec![];
let mut stats_max_arr = vec![];
let mut stats_null_count_arr = vec![];
let mut stats_distinct_count_arr = vec![];
let mut stats_min_value_arr = vec![];
let mut stats_max_value_arr = vec![];
let mut compression_arr = vec![];
let mut encodings_arr = vec![];
let mut index_page_offset_arr = vec![];
let mut dictionary_page_offset_arr = vec![];
let mut data_page_offset_arr = vec![];
let mut total_compressed_size_arr = vec![];
let mut total_uncompressed_size_arr = vec![];
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
for (col_idx, column) in row_group.columns().iter().enumerate() {
filename_arr.push(name.clone());
row_group_id_arr.push(rg_idx as i64);
row_group_num_rows_arr.push(row_group.num_rows());
row_group_num_columns_arr.push(row_group.num_columns() as i64);
row_group_bytes_arr.push(row_group.total_byte_size());
column_id_arr.push(col_idx as i64);
file_offset_arr.push(column.file_offset());
num_values_arr.push(column.num_values());
path_in_schema_arr.push(column.column_path().to_string());
type_arr.push(column.column_type().to_string());
if let Some(s) = column.statistics() {
let (min_val, max_val) = match s {
Statistics::Boolean(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int32(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int64(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Int96(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Float(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::Double(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::ByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
Statistics::FixedLenByteArray(val) => {
(val.min().to_string(), val.max().to_string())
}
};
stats_min_arr.push(Some(min_val.clone()));
stats_max_arr.push(Some(max_val.clone()));
stats_null_count_arr.push(Some(s.null_count() as i64));
stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64));
stats_min_value_arr.push(Some(min_val));
stats_max_value_arr.push(Some(max_val));
} else {
stats_min_arr.push(None);
stats_max_arr.push(None);
stats_null_count_arr.push(None);
stats_distinct_count_arr.push(None);
stats_min_value_arr.push(None);
stats_max_value_arr.push(None);
};
compression_arr.push(format!("{:?}", column.compression()));
encodings_arr.push(format!("{:?}", column.encodings()));
index_page_offset_arr.push(column.index_page_offset());
dictionary_page_offset_arr.push(column.dictionary_page_offset());
data_page_offset_arr.push(column.data_page_offset());
total_compressed_size_arr.push(column.compressed_size());
total_uncompressed_size_arr.push(column.uncompressed_size());
}
}

let rb = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![
metadata.file_metadata().version();
num_groups
])),
Arc::new(Int64Array::from(vec![
metadata.file_metadata().num_rows();
num_groups
])),
Arc::new(StringArray::from(vec![
format!(
"{:?}",
metadata.file_metadata().created_by()
);
num_groups
])),
Arc::new(StringArray::from(vec![
format!(
"{:?}",
metadata.file_metadata().column_orders()
);
num_groups
])),
Arc::new(Int64Array::from(vec![num_groups as i64; num_groups])),
Arc::new(StringArray::from(row_groups)),
Arc::new(StringArray::from(filename_arr)),
Arc::new(Int64Array::from(row_group_id_arr)),
Arc::new(Int64Array::from(row_group_num_rows_arr)),
Arc::new(Int64Array::from(row_group_num_columns_arr)),
Arc::new(Int64Array::from(row_group_bytes_arr)),
Arc::new(Int64Array::from(column_id_arr)),
Arc::new(Int64Array::from(file_offset_arr)),
Arc::new(Int64Array::from(num_values_arr)),
Arc::new(StringArray::from(path_in_schema_arr)),
Arc::new(StringArray::from(type_arr)),
Arc::new(StringArray::from(stats_min_arr)),
Arc::new(StringArray::from(stats_max_arr)),
Arc::new(Int64Array::from(stats_null_count_arr)),
Arc::new(Int64Array::from(stats_distinct_count_arr)),
Arc::new(StringArray::from(stats_min_value_arr)),
Arc::new(StringArray::from(stats_max_value_arr)),
Arc::new(StringArray::from(compression_arr)),
Arc::new(StringArray::from(encodings_arr)),
Arc::new(Int64Array::from(index_page_offset_arr)),
Arc::new(Int64Array::from(dictionary_page_offset_arr)),
Arc::new(Int64Array::from(data_page_offset_arr)),
Arc::new(Int64Array::from(total_compressed_size_arr)),
Arc::new(Int64Array::from(total_uncompressed_size_arr)),
],
)?;

Expand Down

0 comments on commit 5516189

Please sign in to comment.