From cdf36577c47e8a7c059dc7ebd4defdc754bb1d85 Mon Sep 17 00:00:00 2001 From: kazuhiko kikuchi Date: Mon, 6 Jun 2022 12:59:31 +0900 Subject: [PATCH 01/13] add parquet-fromcsv (#1) add command line tool for convert csv to parquet. --- parquet/Cargo.toml | 7 +- parquet/src/bin/parquet-fromcsv.rs | 571 +++++++++++++++++++++++++++++ 2 files changed, 577 insertions(+), 1 deletion(-) create mode 100644 parquet/src/bin/parquet-fromcsv.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index bb7a8cd10583..9f96c04d2a02 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -49,6 +49,7 @@ serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" futures = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } +anyhow = { version = "1.0.57", optional = true } [dev-dependencies] criterion = "0.3" @@ -66,7 +67,7 @@ all-features = true [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] -cli = ["serde_json", "base64", "clap"] +cli = ["serde_json", "base64", "clap", "anyhow", "arrow/csv"] test_common = [] # Experimental, unstable functionality primarily used for testing experimental = [] @@ -85,6 +86,10 @@ required-features = ["cli"] name = "parquet-rowcount" required-features = ["cli"] +[[bin]] +name = "parquet-fromcsv" +required-features = ["cli"] + [[bench]] name = "arrow_writer" harness = false diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs new file mode 100644 index 000000000000..b596b1429d71 --- /dev/null +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! ``` +//! cargo install parquet --features=cli +//! ``` +//! After this `parquet-fromcsv` shoud be available: +//! ``` +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! ``` +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ``` +//! parquet 14.0.0 +//! Apache Arrow +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema --input-file --output-file +//! +//! OPTIONS: +//! -b, --batch-size +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote +//! double quote +//! +//! -e, --escape-char +//! escape charactor +//! +//! -f, --input-format +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file +//! input CSV file +//! +//! -o, --output-file +//! output Parquet file +//! +//! -q, --quote-char +//! quate charactor +//! +//! -r, --record-terminator +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option, + #[clap(short, long, help("escape charactor"))] + escape_char: Option, + #[clap(short, long, help("quate charactor"))] + quote_char: Option, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } + fn get_quoting(&self) -> Option { + if let Some(_qch) = self.quote_char { + Some(true) + } else { + match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(false), + } + } + } +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum CsvDialect { + CSV, + TSV, +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum RecordTerminator { + LF, + CRLF, + CR, +} + +fn configure_writer_properties(compression: Compression) -> WriterProperties { + let properties_builder = WriterProperties::builder().set_compression(compression); + properties_builder.build() +} + +fn configure_reader_builder(args: &Args, arrow_schema: Arc) -> ReaderBuilder { + fn configure_reader ReaderBuilder>( + builder: ReaderBuilder, + value: Option, + fun: F, + ) -> ReaderBuilder { + if let Some(val) = value { + fun(builder, val) + } else { + builder + } + } + + let mut builder = ReaderBuilder::new() + .with_schema(arrow_schema) + .with_batch_size(args.batch_size) + .has_header(args.has_header) + .with_delimiter(args.get_delimiter()); + + builder = configure_reader( + builder, + args.get_terminator(), + ReaderBuilder::with_terminator, + ); + builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape); + builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote); + + builder +} + +fn arrow_schema_from_string(schema: &str) -> Result> { + let schema = Arc::new(parse_message_type(&schema)?); + let desc = SchemaDescriptor::new(schema); + let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); + Ok(arrow_schema) +} + +fn convert_csv_to_parquet(args: &Args) -> Result<()> { + let schema = read_to_string(args.schema_path()).with_context(|| { + format!("Failed to open schema file {:#?}", args.schema_path()) + })?; + let arrow_schema = arrow_schema_from_string(&schema)?; + + // create output parquet writer + let parquet_file = File::create(&args.output_file).context(format!( + "Failed to create output file {:#?}", + &args.output_file + ))?; + + let writer_properties = Some(configure_writer_properties(args.parquet_compression)); + let mut arrow_writer = + ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) + .context("Failed to create ArrowWriter")?; + + // open input file + let input_file = File::open(&args.input_file) + .with_context(|| format!("Failed to open input file {:#?}", &args.input_file))?; + // create input csv reader + let builder = configure_reader_builder(&args, arrow_schema); + let reader = builder.build(input_file)?; + for batch_result in reader { + let batch = batch_result.context("Failed to read RecordBatch from CSV")?; + arrow_writer + .write(&batch) + .context("Failed to write RecordBatch to parquet")?; + } + arrow_writer.close().context("Failed to close parquet")?; + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + convert_csv_to_parquet(&args) +} + +#[cfg(test)] +mod tests { + use std::{ + io::{Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + }; + + use super::*; + use anyhow::Result; + use arrow::datatypes::{DataType, Field}; + use clap::Parser; + use tempfile::NamedTempFile; + + fn parse_args(mut extra_args: Vec<&str>) -> Result { + let mut args = vec![ + "test", + "--schema", + "test.schema", + "--input-file", + "infile.csv", + "--output-file", + "out.parquet", + ]; + args.append(&mut extra_args); + let args = Args::try_parse_from(args.iter())?; + Ok(args) + } + + #[test] + fn test_parse_arg_minimum() -> Result<()> { + let args = parse_args(vec![])?; + + assert_eq!(args.schema, PathBuf::from(Path::new("test.schema"))); + assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv"))); + assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet"))); + // test default values + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.batch_size, 1000); + assert_eq!(args.has_header, false); + assert_eq!(args.delimiter, None); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.record_terminator, None); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.quote_char, None); + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.double_quote, None); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + Ok(()) + } + + #[test] + fn test_parse_arg_format_variants() -> Result<()> { + let args = parse_args(vec!["--input-format", "csv"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), None); + let args = parse_args(vec!["--input-format", "tsv"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b'\t'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), Some(b'\\')); + + let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b':'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_parse_arg_format_error() { + parse_args(vec!["--input-format", "excel"]).unwrap(); + } + + #[test] + fn test_parse_arg_compression_format() { + let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED); + let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::GZIP); + let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZO); + let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZ4); + let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::BROTLI); + let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::ZSTD); + } + + #[test] + #[should_panic] + fn test_parse_arg_compression_format_fail() { + parse_args(vec!["--parquet-compression", "zip"]).unwrap(); + } + + fn assert_debug_text(debug_text: &str, name: &str, value: &str) { + let pattern = format!(" {}: {}", name, value); + assert!( + debug_text.contains(&pattern), + "\"{}\" not contains \"{}\"", + debug_text, + pattern + ) + } + + #[test] + fn test_configure_reader_builder() { + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 1000, + input_format: CsvDialect::CSV, + has_header: false, + delimiter: None, + record_terminator: None, + escape_char: None, + quote_char: None, + double_quote: None, + parquet_compression: Compression::SNAPPY, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + + let reader_builder = configure_reader_builder(&args, arrow_schema.clone()); + let builder_debug = format!("{:?}", reader_builder); + assert_debug_text(&builder_debug, "has_header", "false"); + assert_debug_text(&builder_debug, "delimiter", "Some(44)"); + assert_debug_text(&builder_debug, "quote", "Some(34)"); + assert_debug_text(&builder_debug, "terminator", "None"); + assert_debug_text(&builder_debug, "batch_size", "1000"); + assert_debug_text(&builder_debug, "escape", "None"); + + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 2000, + input_format: CsvDialect::TSV, + has_header: true, + delimiter: None, + record_terminator: None, + escape_char: Some('\\'), + quote_char: None, + double_quote: None, + parquet_compression: Compression::SNAPPY, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + let reader_builder = configure_reader_builder(&args, arrow_schema.clone()); + let builder_debug = format!("{:?}", reader_builder); + assert_debug_text(&builder_debug, "has_header", "true"); + assert_debug_text(&builder_debug, "delimiter", "Some(9)"); + assert_debug_text(&builder_debug, "quote", "None"); + assert_debug_text(&builder_debug, "terminator", "Some(10)"); + assert_debug_text(&builder_debug, "batch_size", "2000"); + assert_debug_text(&builder_debug, "escape", "Some(92)"); + } + + #[test] + fn test_convert_csv_to_parquet() { + let schema = NamedTempFile::new().unwrap(); + let schema_text = r"message schema { + optional int32 id; + optional binary name (STRING); + }"; + schema.as_file().write_all(schema_text.as_bytes()).unwrap(); + + let mut input_file = NamedTempFile::new().unwrap(); + { + let csv = input_file.as_file_mut(); + for index in 1..2000 { + write!(csv, "{},\"name_{}\"\x0d\x0a", index, index).unwrap(); + } + csv.flush().unwrap(); + csv.seek(SeekFrom::Start(0)).unwrap(); + } + let output_parquet = NamedTempFile::new().unwrap(); + + let args = Args { + schema: PathBuf::from(schema.path()), + input_file: PathBuf::from(input_file.path()), + output_file: PathBuf::from(output_parquet.path()), + batch_size: 1000, + input_format: CsvDialect::CSV, + has_header: false, + delimiter: None, + record_terminator: None, + escape_char: None, + quote_char: None, + double_quote: None, + parquet_compression: Compression::SNAPPY, + }; + convert_csv_to_parquet(&args).unwrap(); + } +} From 414c7b7f8617e13f0e4d43ee8be87442968f1b7c Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Tue, 7 Jun 2022 11:33:17 +0900 Subject: [PATCH 02/13] add `text` for non-rust documentation text --- parquet/src/bin/parquet-fromcsv.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index b596b1429d71..7707c36bd6a1 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -20,23 +20,27 @@ //! # Install //! //! `parquet-fromcsv` can be installed using `cargo`: -//! ``` +//! +//! ```text //! cargo install parquet --features=cli //! ``` +//! //! After this `parquet-fromcsv` shoud be available: -//! ``` +//! +//! ```text //! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet //! ``` //! //! The binary can also be build form the source code and run as follows: -//! ``` +//! +//! ```text //! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ //! \ input.csv output.parquet //! ``` //! //! # Options //! -//! ``` +//! ```text //! parquet 14.0.0 //! Apache Arrow //! Binary file to converts csv to Parquet file From 7338cc23feb0dcf8385be0ee56df3b006a5a0817 Mon Sep 17 00:00:00 2001 From: kazuhiko kikuchi Date: Wed, 8 Jun 2022 17:21:25 +0900 Subject: [PATCH 03/13] Update parquet/src/bin/parquet-fromcsv.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/bin/parquet-fromcsv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 7707c36bd6a1..9522965694ac 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -139,7 +139,7 @@ use parquet::{ }; #[derive(Debug, Parser)] -#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +#[clap(author, version, about("Binary to convert csv to Parquet"), long_about=None)] struct Args { /// Parquet schema file path #[clap(short, long, help("message schema for output Parquet"))] From 7174b2d52ca1e55031a5cba0a39bf9be3fc99ed9 Mon Sep 17 00:00:00 2001 From: kazuhiko kikuchi Date: Wed, 8 Jun 2022 17:21:35 +0900 Subject: [PATCH 04/13] Update parquet/src/bin/parquet-fromcsv.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/bin/parquet-fromcsv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 9522965694ac..611be90e8a67 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -31,7 +31,7 @@ //! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet //! ``` //! -//! The binary can also be build form the source code and run as follows: +//! The binary can also be built from the source code and run as follows: //! //! ```text //! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ From 4b406becb1a832a4f685ab7c46c31a60743a6544 Mon Sep 17 00:00:00 2001 From: kazuhiko kikuchi Date: Wed, 8 Jun 2022 17:21:47 +0900 Subject: [PATCH 05/13] Update parquet/src/bin/parquet-fromcsv.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/bin/parquet-fromcsv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 611be90e8a67..d5fc39bf10e4 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -141,7 +141,7 @@ use parquet::{ #[derive(Debug, Parser)] #[clap(author, version, about("Binary to convert csv to Parquet"), long_about=None)] struct Args { - /// Parquet schema file path + /// Path to a text file containing a parquet schema definition #[clap(short, long, help("message schema for output Parquet"))] schema: PathBuf, /// input CSV file path From 6dbd7ca74bee5788cae711031bb93ec00daf40ce Mon Sep 17 00:00:00 2001 From: kazuhiko kikuchi Date: Wed, 8 Jun 2022 17:23:00 +0900 Subject: [PATCH 06/13] Update parquet/src/bin/parquet-fromcsv.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/bin/parquet-fromcsv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index d5fc39bf10e4..34b81403463b 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -549,7 +549,7 @@ mod tests { { let csv = input_file.as_file_mut(); for index in 1..2000 { - write!(csv, "{},\"name_{}\"\x0d\x0a", index, index).unwrap(); + write!(csv, "{},\"name_{}\"\r\n", index, index).unwrap(); } csv.flush().unwrap(); csv.seek(SeekFrom::Start(0)).unwrap(); From 3ccf7054c3e46952058dec95d9fcd17f5e518afc Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Wed, 8 Jun 2022 18:34:31 +0900 Subject: [PATCH 07/13] automate update help text --- parquet/src/bin/parquet-fromcsv-help.txt | 61 +++++++++++++++ parquet/src/bin/parquet-fromcsv.rs | 98 +++++------------------- 2 files changed, 81 insertions(+), 78 deletions(-) create mode 100644 parquet/src/bin/parquet-fromcsv-help.txt diff --git a/parquet/src/bin/parquet-fromcsv-help.txt b/parquet/src/bin/parquet-fromcsv-help.txt new file mode 100644 index 000000000000..91c5ff0b115f --- /dev/null +++ b/parquet/src/bin/parquet-fromcsv-help.txt @@ -0,0 +1,61 @@ +parquet 15.0.0 +Apache Arrow +Binary to convert csv to Parquet + +USAGE: + parquet [OPTIONS] --schema --input-file --output-file + +OPTIONS: + -b, --batch-size + batch size + + [env: PARQUET_FROM_CSV_BATCHSIZE=] + [default: 1000] + + -c, --parquet-compression + compression mode + + [default: SNAPPY] + + -d, --delimiter + field delimiter + + default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' + + -D, --double-quote + double quote + + -e, --escape-char + escape charactor + + -f, --input-format + input file format + + [default: csv] + [possible values: csv, tsv] + + -h, --has-header + has header + + --help + Print help information + + -i, --input-file + input CSV file + + -o, --output-file + output Parquet file + + -q, --quote-char + quate charactor + + -r, --record-terminator + record terminator + + [possible values: lf, crlf, cr] + + -s, --schema + message schema for output Parquet + + -V, --version + Print version information diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 34b81403463b..ac4bed73f1c0 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -41,67 +41,7 @@ //! # Options //! //! ```text -//! parquet 14.0.0 -//! Apache Arrow -//! Binary file to converts csv to Parquet file -//! -//! USAGE: -//! parquet-fromcsv [OPTIONS] --schema --input-file --output-file -//! -//! OPTIONS: -//! -b, --batch-size -//! batch size -//! -//! [env: PARQUET_FROM_CSV_BATCHSIZE=] -//! [default: 1000] -//! -//! -c, --parquet-compression -//! compression mode -//! -//! [default: SNAPPY] -//! -//! -d, --delimiter -//! field delimiter -//! -//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' -//! -//! -D, --double-quote -//! double quote -//! -//! -e, --escape-char -//! escape charactor -//! -//! -f, --input-format -//! input file format -//! -//! [default: csv] -//! [possible values: csv, tsv] -//! -//! -h, --has-header -//! has header -//! -//! --help -//! Print help information -//! -//! -i, --input-file -//! input CSV file -//! -//! -o, --output-file -//! output Parquet file -//! -//! -q, --quote-char -//! quate charactor -//! -//! -r, --record-terminator -//! record terminator -//! -//! [possible values: lf, crlf, cr] -//! -//! -s, --schema -//! message schema for output Parquet -//! -//! -V, --version -//! Print version information +#![doc = include_str!("./parquet-fromcsv-help.txt")] // Update for this file : Run test test_command_help //! ``` //! //! ## Parquet file options @@ -241,16 +181,6 @@ impl Args { self.quote_char.map(|c| c as u8) } } - fn get_quoting(&self) -> Option { - if let Some(_qch) = self.quote_char { - Some(true) - } else { - match self.input_format { - CsvDialect::CSV => None, - CsvDialect::TSV => Some(false), - } - } - } } #[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] @@ -356,9 +286,21 @@ mod tests { use super::*; use anyhow::Result; use arrow::datatypes::{DataType, Field}; - use clap::Parser; + use clap::{CommandFactory, Parser}; use tempfile::NamedTempFile; + #[test] + fn test_command_help() { + let mut cmd = Args::command(); + let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); + let mut path_buf = PathBuf::from(dir); + path_buf.push("src"); + path_buf.push("bin"); + path_buf.push("parquet-fromcsv-help.txt"); + let mut help_file = File::create(path_buf).unwrap(); + cmd.write_long_help(&mut help_file); + } + fn parse_args(mut extra_args: Vec<&str>) -> Result { let mut args = vec![ "test", @@ -403,14 +345,12 @@ mod tests { assert_eq!(args.get_delimiter(), b','); assert_eq!(args.get_terminator(), None); // CRLF assert_eq!(args.get_quote(), Some(b'\"')); - assert_eq!(args.get_quoting(), None); // quoting default: true assert_eq!(args.get_escape(), None); let args = parse_args(vec!["--input-format", "tsv"])?; assert_eq!(args.input_format, CsvDialect::TSV); assert_eq!(args.get_delimiter(), b'\t'); assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF assert_eq!(args.get_quote(), None); // quote none - assert_eq!(args.get_quoting(), Some(false)); assert_eq!(args.get_escape(), None); let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; @@ -418,7 +358,6 @@ mod tests { assert_eq!(args.get_delimiter(), b','); assert_eq!(args.get_terminator(), None); // CRLF assert_eq!(args.get_quote(), Some(b'\"')); - assert_eq!(args.get_quoting(), None); // quoting default: true assert_eq!(args.get_escape(), Some(b'\\')); let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; @@ -426,7 +365,6 @@ mod tests { assert_eq!(args.get_delimiter(), b':'); assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF assert_eq!(args.get_quote(), None); // quote none - assert_eq!(args.get_quoting(), Some(false)); assert_eq!(args.get_escape(), None); Ok(()) @@ -457,9 +395,13 @@ mod tests { } #[test] - #[should_panic] fn test_parse_arg_compression_format_fail() { - parse_args(vec!["--parquet-compression", "zip"]).unwrap(); + match parse_args(vec!["--parquet-compression", "zip"]) { + Ok(_) => panic!("unexpected success"), + Err(e) => assert_eq!( + format!("{}", e), + "error: Invalid value \"zip\" for '--parquet-compression ': Unknown compression ZIP : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD \n\nFor more information try --help\n"), + } } fn assert_debug_text(debug_text: &str, name: &str, value: &str) { From d91bfde0530671c5ad993567119eb02024cb69cd Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Thu, 9 Jun 2022 12:18:36 +0900 Subject: [PATCH 08/13] remove anyhow --- parquet/Cargo.toml | 3 +- parquet/src/bin/parquet-fromcsv.rs | 129 +++++++++++++++++++++++------ 2 files changed, 104 insertions(+), 28 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 9f96c04d2a02..eb7b9343414c 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -49,7 +49,6 @@ serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" futures = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } -anyhow = { version = "1.0.57", optional = true } [dev-dependencies] criterion = "0.3" @@ -67,7 +66,7 @@ all-features = true [features] default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] -cli = ["serde_json", "base64", "clap", "anyhow", "arrow/csv"] +cli = ["serde_json", "base64", "clap", "arrow/csv"] test_common = [] # Experimental, unstable functionality primarily used for testing experimental = [] diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index ac4bed73f1c0..8b212a3c13e7 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -63,21 +63,82 @@ //! use std::{ + fmt::Display, fs::{read_to_string, File}, path::{Path, PathBuf}, sync::Arc, }; -use anyhow::{bail, Context, Error, Result}; -use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use arrow::{csv::ReaderBuilder, datatypes::Schema, error::ArrowError}; use clap::{ArgEnum, Parser}; use parquet::{ arrow::{parquet_to_arrow_schema, ArrowWriter}, basic::Compression, + errors::ParquetError, file::properties::WriterProperties, schema::{parser::parse_message_type, types::SchemaDescriptor}, }; +#[derive(Debug)] +enum ParquetFromCsvError { + CommandLineParseError(clap::Error), + IoError(std::io::Error), + ArrowError(ArrowError), + ParquetError(ParquetError), + WithContext(String, Box), +} + +impl From for ParquetFromCsvError { + fn from(e: std::io::Error) -> Self { + Self::IoError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: ArrowError) -> Self { + Self::ArrowError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: ParquetError) -> Self { + Self::ParquetError(e) + } +} + +impl From for ParquetFromCsvError { + fn from(e: clap::Error) -> Self { + Self::CommandLineParseError(e) + } +} + +impl ParquetFromCsvError { + pub fn with_context>( + inner_error: E, + context: &str, + ) -> ParquetFromCsvError { + let inner = inner_error.into(); + ParquetFromCsvError::WithContext(context.to_string(), Box::new(inner)) + } +} + +impl Display for ParquetFromCsvError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ParquetFromCsvError::CommandLineParseError(e) => + write!(f, "{}", e), + ParquetFromCsvError::IoError(e) => + write!(f, "{}", e), + ParquetFromCsvError::ArrowError(e) => write!(f, "{}", e), + ParquetFromCsvError::ParquetError(e) => write!(f, "{}", e), + ParquetFromCsvError::WithContext(c, e) => { + write!(f, "{}\n", e)?; + write!(f,"context: {}", c) + } + } + } +} + #[derive(Debug, Parser)] #[clap(author, version, about("Binary to convert csv to Parquet"), long_about=None)] struct Args { @@ -131,7 +192,7 @@ struct Args { parquet_compression: Compression, } -fn compression_from_str(cmp: &str) -> Result { +fn compression_from_str(cmp: &str) -> Result { match cmp.to_uppercase().as_str() { "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), "SNAPPY" => Ok(Compression::SNAPPY), @@ -140,7 +201,9 @@ fn compression_from_str(cmp: &str) -> Result { "BROTLI" => Ok(Compression::BROTLI), "LZ4" => Ok(Compression::LZ4), "ZSTD" => Ok(Compression::ZSTD), - v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + v => Err( + format!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ",v) + ) } } @@ -231,47 +294,62 @@ fn configure_reader_builder(args: &Args, arrow_schema: Arc) -> ReaderBui builder } -fn arrow_schema_from_string(schema: &str) -> Result> { +fn arrow_schema_from_string(schema: &str) -> Result, ParquetFromCsvError> { let schema = Arc::new(parse_message_type(&schema)?); let desc = SchemaDescriptor::new(schema); let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); Ok(arrow_schema) } -fn convert_csv_to_parquet(args: &Args) -> Result<()> { - let schema = read_to_string(args.schema_path()).with_context(|| { - format!("Failed to open schema file {:#?}", args.schema_path()) +fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { + let schema = read_to_string(args.schema_path()).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to open schema file {:#?}", args.schema_path()), + ) })?; let arrow_schema = arrow_schema_from_string(&schema)?; // create output parquet writer - let parquet_file = File::create(&args.output_file).context(format!( - "Failed to create output file {:#?}", - &args.output_file - ))?; + let parquet_file = File::create(&args.output_file).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to create output file {:#?}", &args.output_file), + ) + })?; let writer_properties = Some(configure_writer_properties(args.parquet_compression)); let mut arrow_writer = ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) - .context("Failed to create ArrowWriter")?; + .map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to create ArrowWriter") + })?; // open input file - let input_file = File::open(&args.input_file) - .with_context(|| format!("Failed to open input file {:#?}", &args.input_file))?; + let input_file = File::open(&args.input_file).map_err(|e| { + ParquetFromCsvError::with_context( + e, + &format!("Failed to open input file {:#?}", &args.input_file), + ) + })?; // create input csv reader let builder = configure_reader_builder(&args, arrow_schema); let reader = builder.build(input_file)?; for batch_result in reader { - let batch = batch_result.context("Failed to read RecordBatch from CSV")?; - arrow_writer - .write(&batch) - .context("Failed to write RecordBatch to parquet")?; + let batch = batch_result.map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to read RecordBatch from CSV") + })?; + arrow_writer.write(&batch).map_err(|e| { + ParquetFromCsvError::with_context(e, "Failed to write RecordBatch to parquet") + })?; } - arrow_writer.close().context("Failed to close parquet")?; + arrow_writer + .close() + .map_err(|e| ParquetFromCsvError::with_context(e, "Failed to close parquet"))?; Ok(()) } -fn main() -> Result<()> { +fn main() -> Result<(), ParquetFromCsvError> { let args = Args::parse(); convert_csv_to_parquet(&args) } @@ -284,7 +362,6 @@ mod tests { }; use super::*; - use anyhow::Result; use arrow::datatypes::{DataType, Field}; use clap::{CommandFactory, Parser}; use tempfile::NamedTempFile; @@ -298,10 +375,10 @@ mod tests { path_buf.push("bin"); path_buf.push("parquet-fromcsv-help.txt"); let mut help_file = File::create(path_buf).unwrap(); - cmd.write_long_help(&mut help_file); + cmd.write_long_help(&mut help_file).unwrap(); } - fn parse_args(mut extra_args: Vec<&str>) -> Result { + fn parse_args(mut extra_args: Vec<&str>) -> Result { let mut args = vec![ "test", "--schema", @@ -317,7 +394,7 @@ mod tests { } #[test] - fn test_parse_arg_minimum() -> Result<()> { + fn test_parse_arg_minimum() -> Result<(), ParquetFromCsvError> { let args = parse_args(vec![])?; assert_eq!(args.schema, PathBuf::from(Path::new("test.schema"))); @@ -339,7 +416,7 @@ mod tests { } #[test] - fn test_parse_arg_format_variants() -> Result<()> { + fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> { let args = parse_args(vec!["--input-format", "csv"])?; assert_eq!(args.input_format, CsvDialect::CSV); assert_eq!(args.get_delimiter(), b','); From 71d0f8fa3b50e7ecd47839186c0700c24b231909 Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Fri, 10 Jun 2022 13:14:31 +0900 Subject: [PATCH 09/13] add rat_exclude_files --- dev/release/rat_exclude_files.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index c7996a78af86..466f6fa45267 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -20,3 +20,4 @@ conbench/.isort.cfg arrow-flight/src/arrow.flight.protocol.rs arrow-flight/src/sql/arrow.flight.protocol.sql.rs .github/* +parquet/src/bin/parquet-fromcsv-help.txt From 7a1e80fac6bcda42e46059eeb3cd72be071cb340 Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Fri, 10 Jun 2022 13:28:05 +0900 Subject: [PATCH 10/13] update test_command_help --- parquet/src/bin/parquet-fromcsv.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 8b212a3c13e7..0a960e46af46 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -374,8 +374,12 @@ mod tests { path_buf.push("src"); path_buf.push("bin"); path_buf.push("parquet-fromcsv-help.txt"); - let mut help_file = File::create(path_buf).unwrap(); - cmd.write_long_help(&mut help_file).unwrap(); + let expected = std::fs::read_to_string(path_buf).unwrap(); + let mut buffer_vec = Vec::new(); + let mut buffer = std::io::Cursor::new(&mut buffer_vec); + cmd.write_long_help(&mut buffer).unwrap(); + let actual = String::from_utf8(buffer_vec).unwrap(); + assert_eq!( expected, actual, "help text not match. please update to \n---\n{}\n---\n", actual) } fn parse_args(mut extra_args: Vec<&str>) -> Result { From ddeeff27f365ce7f1792a1fca450a966a07f2d0f Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Fri, 10 Jun 2022 13:33:16 +0900 Subject: [PATCH 11/13] fix clippy warnings --- parquet/src/bin/parquet-fromcsv.rs | 44 +++++++++++++++--------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 0a960e46af46..04347135e8ea 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -132,7 +132,7 @@ impl Display for ParquetFromCsvError { ParquetFromCsvError::ArrowError(e) => write!(f, "{}", e), ParquetFromCsvError::ParquetError(e) => write!(f, "{}", e), ParquetFromCsvError::WithContext(c, e) => { - write!(f, "{}\n", e)?; + writeln!(f, "{}", e)?; write!(f,"context: {}", c) } } @@ -157,7 +157,7 @@ struct Args { short('f'), long, help("input file format"), - default_value_t=CsvDialect::CSV + default_value_t=CsvDialect::Csv )] input_format: CsvDialect, /// batch size @@ -215,8 +215,8 @@ impl Args { match self.delimiter { Some(ch) => ch as u8, None => match self.input_format { - CsvDialect::CSV => b',', - CsvDialect::TSV => b'\t', + CsvDialect::Csv => b',', + CsvDialect::Tsv => b'\t', }, } } @@ -224,10 +224,10 @@ impl Args { match self.record_terminator { Some(RecordTerminator::LF) => Some(0x0a), Some(RecordTerminator::CR) => Some(0x0d), - Some(RecordTerminator::CRLF) => None, + Some(RecordTerminator::Crlf) => None, None => match self.input_format { - CsvDialect::CSV => None, - CsvDialect::TSV => Some(0x0a), + CsvDialect::Csv => None, + CsvDialect::Tsv => Some(0x0a), }, } } @@ -237,8 +237,8 @@ impl Args { fn get_quote(&self) -> Option { if self.quote_char.is_none() { match self.input_format { - CsvDialect::CSV => Some(b'\"'), - CsvDialect::TSV => None, + CsvDialect::Csv => Some(b'\"'), + CsvDialect::Tsv => None, } } else { self.quote_char.map(|c| c as u8) @@ -248,14 +248,14 @@ impl Args { #[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] enum CsvDialect { - CSV, - TSV, + Csv, + Tsv, } #[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] enum RecordTerminator { LF, - CRLF, + Crlf, CR, } @@ -295,7 +295,7 @@ fn configure_reader_builder(args: &Args, arrow_schema: Arc) -> ReaderBui } fn arrow_schema_from_string(schema: &str) -> Result, ParquetFromCsvError> { - let schema = Arc::new(parse_message_type(&schema)?); + let schema = Arc::new(parse_message_type(schema)?); let desc = SchemaDescriptor::new(schema); let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); Ok(arrow_schema) @@ -333,7 +333,7 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { ) })?; // create input csv reader - let builder = configure_reader_builder(&args, arrow_schema); + let builder = configure_reader_builder(args, arrow_schema); let reader = builder.build(input_file)?; for batch_result in reader { let batch = batch_result.map_err(|e| { @@ -405,7 +405,7 @@ mod tests { assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv"))); assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet"))); // test default values - assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.input_format, CsvDialect::Csv); assert_eq!(args.batch_size, 1000); assert_eq!(args.has_header, false); assert_eq!(args.delimiter, None); @@ -422,27 +422,27 @@ mod tests { #[test] fn test_parse_arg_format_variants() -> Result<(), ParquetFromCsvError> { let args = parse_args(vec!["--input-format", "csv"])?; - assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.input_format, CsvDialect::Csv); assert_eq!(args.get_delimiter(), b','); assert_eq!(args.get_terminator(), None); // CRLF assert_eq!(args.get_quote(), Some(b'\"')); assert_eq!(args.get_escape(), None); let args = parse_args(vec!["--input-format", "tsv"])?; - assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.input_format, CsvDialect::Tsv); assert_eq!(args.get_delimiter(), b'\t'); assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF assert_eq!(args.get_quote(), None); // quote none assert_eq!(args.get_escape(), None); let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; - assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.input_format, CsvDialect::Csv); assert_eq!(args.get_delimiter(), b','); assert_eq!(args.get_terminator(), None); // CRLF assert_eq!(args.get_quote(), Some(b'\"')); assert_eq!(args.get_escape(), Some(b'\\')); let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; - assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.input_format, CsvDialect::Tsv); assert_eq!(args.get_delimiter(), b':'); assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF assert_eq!(args.get_quote(), None); // quote none @@ -502,7 +502,7 @@ mod tests { input_file: PathBuf::from(Path::new("test.csv")), output_file: PathBuf::from(Path::new("out.parquet")), batch_size: 1000, - input_format: CsvDialect::CSV, + input_format: CsvDialect::Csv, has_header: false, delimiter: None, record_terminator: None, @@ -533,7 +533,7 @@ mod tests { input_file: PathBuf::from(Path::new("test.csv")), output_file: PathBuf::from(Path::new("out.parquet")), batch_size: 2000, - input_format: CsvDialect::TSV, + input_format: CsvDialect::Tsv, has_header: true, delimiter: None, record_terminator: None, @@ -584,7 +584,7 @@ mod tests { input_file: PathBuf::from(input_file.path()), output_file: PathBuf::from(output_parquet.path()), batch_size: 1000, - input_format: CsvDialect::CSV, + input_format: CsvDialect::Csv, has_header: false, delimiter: None, record_terminator: None, From ce3cefdc008968406674f14904c83ba2a5cd68da Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Fri, 10 Jun 2022 14:07:46 +0900 Subject: [PATCH 12/13] add writer-version, max-row-group-size arg --- parquet/src/bin/parquet-fromcsv-help.txt | 6 +++ parquet/src/bin/parquet-fromcsv.rs | 53 ++++++++++++++++++------ 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/parquet/src/bin/parquet-fromcsv-help.txt b/parquet/src/bin/parquet-fromcsv-help.txt index 91c5ff0b115f..a087b4fda8ed 100644 --- a/parquet/src/bin/parquet-fromcsv-help.txt +++ b/parquet/src/bin/parquet-fromcsv-help.txt @@ -43,6 +43,9 @@ OPTIONS: -i, --input-file input CSV file + -m, --max-row-group-size + max row group size + -o, --output-file output Parquet file @@ -59,3 +62,6 @@ OPTIONS: -V, --version Print version information + + -w, --writer-version + writer version diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 04347135e8ea..807f0b930cc2 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -48,18 +48,20 @@ //! //! - `-b`, `--batch-size` : Batch size for Parquet //! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY -//! - `-s`, `--schema` : path to message schema for generated Parquet file -//! - `-o`, `--output-file` : path to output parquet file -//! +//! - `-s`, `--schema` : Path to message schema for generated Parquet file +//! - `-o`, `--output-file` : Path to output Parquet file +//! - `-w`, `--writer-version` : Writer version +//! - `-m`, `--max-row-group-size` : Max row group size +//! //! ## Input file options //! -//! - `-i`, `--input-file` : path to input CSV file -//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-i`, `--input-file` : Path to input CSV file +//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`. //! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` //! - `-e`, `--escape` : Escape charactor for input file -//! - `-h`, `--has-header` : input has header -//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF -//! - `-q`, `--quote-char` : input quoting charactor +//! - `-h`, `--has-header` : Input has header +//! - `-r`, `--record-terminator` : Record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : Input quoting charactor //! use std::{ @@ -75,7 +77,7 @@ use parquet::{ arrow::{parquet_to_arrow_schema, ArrowWriter}, basic::Compression, errors::ParquetError, - file::properties::WriterProperties, + file::properties::{WriterProperties, WriterVersion}, schema::{parser::parse_message_type, types::SchemaDescriptor}, }; @@ -190,6 +192,12 @@ struct Args { #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] #[clap(parse(try_from_str =compression_from_str))] parquet_compression: Compression, + + #[clap(short,long, help("writer version"))] + #[clap(parse(try_from_str =writer_version_from_str))] + writer_version: Option, + #[clap(short,long, help("max row group size"))] + max_row_group_size: Option } fn compression_from_str(cmp: &str) -> Result { @@ -207,6 +215,15 @@ fn compression_from_str(cmp: &str) -> Result { } } +fn writer_version_from_str(cmp: &str) -> Result { + match cmp.to_uppercase().as_str() { + "1" => Ok(WriterVersion::PARQUET_1_0), + "2" => Ok(WriterVersion::PARQUET_2_0), + v => Err(format!("Unknown writer version {0} : possible values 1, 2",v)) + } +} + + impl Args { fn schema_path(&self) -> &Path { self.schema.as_path() @@ -259,8 +276,14 @@ enum RecordTerminator { CR, } -fn configure_writer_properties(compression: Compression) -> WriterProperties { - let properties_builder = WriterProperties::builder().set_compression(compression); +fn configure_writer_properties(args: &Args) -> WriterProperties { + let mut properties_builder = WriterProperties::builder().set_compression(args.parquet_compression); + if let Some(writer_version) = args.writer_version { + properties_builder = properties_builder.set_writer_version( writer_version ); + } + if let Some(max_row_group_size) = args.max_row_group_size { + properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); + } properties_builder.build() } @@ -318,7 +341,7 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), ParquetFromCsvError> { ) })?; - let writer_properties = Some(configure_writer_properties(args.parquet_compression)); + let writer_properties = Some(configure_writer_properties(args)); let mut arrow_writer = ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) .map_err(|e| { @@ -510,6 +533,8 @@ mod tests { quote_char: None, double_quote: None, parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None }; let arrow_schema = Arc::new(Schema::new(vec![ Field::new("field1", DataType::Utf8, false), @@ -541,6 +566,8 @@ mod tests { quote_char: None, double_quote: None, parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None }; let arrow_schema = Arc::new(Schema::new(vec![ Field::new("field1", DataType::Utf8, false), @@ -592,6 +619,8 @@ mod tests { quote_char: None, double_quote: None, parquet_compression: Compression::SNAPPY, + writer_version: None, + max_row_group_size: None }; convert_csv_to_parquet(&args).unwrap(); } From 312a5967cb5006f2659b6cb14f94cafdce2a6be6 Mon Sep 17 00:00:00 2001 From: Kazuhiko Kikuchi Date: Fri, 10 Jun 2022 14:45:50 +0900 Subject: [PATCH 13/13] fix cargo fmt lint --- parquet/src/bin/parquet-fromcsv.rs | 46 +++++++++++++++++------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 807f0b930cc2..03303fb8b432 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -50,9 +50,9 @@ //! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY //! - `-s`, `--schema` : Path to message schema for generated Parquet file //! - `-o`, `--output-file` : Path to output Parquet file -//! - `-w`, `--writer-version` : Writer version +//! - `-w`, `--writer-version` : Writer version //! - `-m`, `--max-row-group-size` : Max row group size -//! +//! //! ## Input file options //! //! - `-i`, `--input-file` : Path to input CSV file @@ -127,15 +127,13 @@ impl ParquetFromCsvError { impl Display for ParquetFromCsvError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ParquetFromCsvError::CommandLineParseError(e) => - write!(f, "{}", e), - ParquetFromCsvError::IoError(e) => - write!(f, "{}", e), + ParquetFromCsvError::CommandLineParseError(e) => write!(f, "{}", e), + ParquetFromCsvError::IoError(e) => write!(f, "{}", e), ParquetFromCsvError::ArrowError(e) => write!(f, "{}", e), ParquetFromCsvError::ParquetError(e) => write!(f, "{}", e), ParquetFromCsvError::WithContext(c, e) => { writeln!(f, "{}", e)?; - write!(f,"context: {}", c) + write!(f, "context: {}", c) } } } @@ -193,11 +191,11 @@ struct Args { #[clap(parse(try_from_str =compression_from_str))] parquet_compression: Compression, - #[clap(short,long, help("writer version"))] + #[clap(short, long, help("writer version"))] #[clap(parse(try_from_str =writer_version_from_str))] writer_version: Option, - #[clap(short,long, help("max row group size"))] - max_row_group_size: Option + #[clap(short, long, help("max row group size"))] + max_row_group_size: Option, } fn compression_from_str(cmp: &str) -> Result { @@ -209,7 +207,7 @@ fn compression_from_str(cmp: &str) -> Result { "BROTLI" => Ok(Compression::BROTLI), "LZ4" => Ok(Compression::LZ4), "ZSTD" => Ok(Compression::ZSTD), - v => Err( + v => Err( format!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ",v) ) } @@ -219,11 +217,13 @@ fn writer_version_from_str(cmp: &str) -> Result { match cmp.to_uppercase().as_str() { "1" => Ok(WriterVersion::PARQUET_1_0), "2" => Ok(WriterVersion::PARQUET_2_0), - v => Err(format!("Unknown writer version {0} : possible values 1, 2",v)) + v => Err(format!( + "Unknown writer version {0} : possible values 1, 2", + v + )), } } - impl Args { fn schema_path(&self) -> &Path { self.schema.as_path() @@ -277,12 +277,14 @@ enum RecordTerminator { } fn configure_writer_properties(args: &Args) -> WriterProperties { - let mut properties_builder = WriterProperties::builder().set_compression(args.parquet_compression); + let mut properties_builder = + WriterProperties::builder().set_compression(args.parquet_compression); if let Some(writer_version) = args.writer_version { - properties_builder = properties_builder.set_writer_version( writer_version ); + properties_builder = properties_builder.set_writer_version(writer_version); } if let Some(max_row_group_size) = args.max_row_group_size { - properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); + properties_builder = + properties_builder.set_max_row_group_size(max_row_group_size); } properties_builder.build() } @@ -402,7 +404,11 @@ mod tests { let mut buffer = std::io::Cursor::new(&mut buffer_vec); cmd.write_long_help(&mut buffer).unwrap(); let actual = String::from_utf8(buffer_vec).unwrap(); - assert_eq!( expected, actual, "help text not match. please update to \n---\n{}\n---\n", actual) + assert_eq!( + expected, actual, + "help text not match. please update to \n---\n{}\n---\n", + actual + ) } fn parse_args(mut extra_args: Vec<&str>) -> Result { @@ -534,7 +540,7 @@ mod tests { double_quote: None, parquet_compression: Compression::SNAPPY, writer_version: None, - max_row_group_size: None + max_row_group_size: None, }; let arrow_schema = Arc::new(Schema::new(vec![ Field::new("field1", DataType::Utf8, false), @@ -567,7 +573,7 @@ mod tests { double_quote: None, parquet_compression: Compression::SNAPPY, writer_version: None, - max_row_group_size: None + max_row_group_size: None, }; let arrow_schema = Arc::new(Schema::new(vec![ Field::new("field1", DataType::Utf8, false), @@ -620,7 +626,7 @@ mod tests { double_quote: None, parquet_compression: Compression::SNAPPY, writer_version: None, - max_row_group_size: None + max_row_group_size: None, }; convert_csv_to_parquet(&args).unwrap(); }