Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
upgrade to clap 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Feb 3, 2022
1 parent 777f375 commit 4db0445
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 238 deletions.
2 changes: 1 addition & 1 deletion arrow-parquet-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Jorge C. Leitao <[email protected]>"]
edition = "2021"

[dependencies]
clap = "^2.33"
clap = { version = "^3", features = ["derive"] }
arrow2 = { path = "../", default-features = false, features = ["io_parquet", "io_json_integration", "io_parquet_compression"] }
flate2 = "^1"
serde = { version = "^1.0", features = ["rc"] }
Expand Down
152 changes: 70 additions & 82 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
Expand All @@ -12,14 +8,16 @@ use arrow2::{
json_integration::read,
json_integration::ArrowJson,
parquet::write::{
Compression, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
Compression as ParquetCompression, Encoding, FileWriter, RowGroupIterator,
Version as ParquetVersion, WriteOptions,
},
},
};

use clap::{App, Arg};

use clap::Parser;
use flate2::read::GzDecoder;
use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, io::Read};

/// Read gzipped JSON file
pub fn read_gzip_json(
Expand Down Expand Up @@ -59,69 +57,72 @@ pub fn read_gzip_json(
Ok((schema, ipc_fields, batches))
}

#[derive(clap::ArgEnum, Debug, Clone)]
enum Version {
#[clap(name = "1")]
V1,
#[clap(name = "2")]
V2,
}

impl Into<ParquetVersion> for Version {
fn into(self) -> ParquetVersion {
match self {
Version::V1 => ParquetVersion::V1,
Version::V2 => ParquetVersion::V2,
}
}
}

#[derive(clap::ArgEnum, Debug, Clone)]
enum Compression {
Zstd,
Snappy,
Uncompressed,
}

impl Into<ParquetCompression> for Compression {
fn into(self) -> ParquetCompression {
match self {
Compression::Zstd => ParquetCompression::Zstd,
Compression::Snappy => ParquetCompression::Snappy,
Compression::Uncompressed => ParquetCompression::Uncompressed,
}
}
}

#[derive(clap::ArgEnum, PartialEq, Debug, Clone)]
enum EncodingScheme {
Plain,
Delta,
}

#[derive(Debug, Parser)]
struct Args {
#[clap(short, long, help = "Path to JSON file")]
json: String,
#[clap(short('o'), long("output"), help = "Path to write parquet file")]
write_path: String,
#[clap(short, long, arg_enum, help = "Parquet version", default_value_t = Version::V2)]
version: Version,
#[clap(short, long, help = "commas separated projection")]
projection: Option<String>,
#[clap(short, long, arg_enum, help = "encoding scheme for utf8")]
encoding_utf8: EncodingScheme,
#[clap(short, long, arg_enum)]
compression: Compression,
}

fn main() -> Result<()> {
let matches = App::new("json-parquet-integration")
.arg(
Arg::with_name("json")
.long("json")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("write_path")
.long("output")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("version")
.long("version")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("projection")
.long("projection")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("encoding-utf8")
.long("encoding-utf8")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("compression")
.long("compression")
.required(true)
.takes_value(true),
)
.get_matches();
let json_file = matches
.value_of("json")
.expect("must provide path to json file");
let write_path = matches
.value_of("write_path")
.expect("must provide path to write parquet");
let version = matches
.value_of("version")
.expect("must provide version of parquet");
let projection = matches.value_of("projection");
let utf8_encoding = matches
.value_of("encoding-utf8")
.expect("must provide utf8 type encoding");
let compression = matches
.value_of("compression")
.expect("must provide compression");

let projection = projection.map(|x| {
let args = Args::parse();

let projection = args.projection.map(|x| {
x.split(',')
.map(|x| x.parse::<usize>().unwrap())
.collect::<Vec<_>>()
});

let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", json_file)?;
let (schema, _, batches) = read_gzip_json("1.0.0-littleendian", &args.json)?;

let schema = if let Some(projection) = &projection {
let fields = schema
Expand Down Expand Up @@ -164,23 +165,10 @@ fn main() -> Result<()> {
batches
};

let version = if version == "1" {
Version::V1
} else {
Version::V2
};

let compression = match compression {
"uncompressed" => Compression::Uncompressed,
"zstd" => Compression::Zstd,
"snappy" => Compression::Snappy,
other => todo!("{}", other),
};

let options = WriteOptions {
write_statistics: true,
compression,
version,
compression: args.compression.into(),
version: args.version.into(),
};

let encodings = schema
Expand All @@ -189,7 +177,7 @@ fn main() -> Result<()> {
.map(|x| match x.data_type() {
DataType::Dictionary(..) => Encoding::RleDictionary,
DataType::Utf8 | DataType::LargeUtf8 => {
if utf8_encoding == "delta" {
if args.encoding_utf8 == EncodingScheme::Delta {
Encoding::DeltaLengthByteArray
} else {
Encoding::Plain
Expand All @@ -202,7 +190,7 @@ fn main() -> Result<()> {
let row_groups =
RowGroupIterator::try_new(batches.into_iter().map(Ok), &schema, options, encodings)?;

let writer = File::create(write_path)?;
let writer = File::create(args.write_path)?;

let mut writer = FileWriter::try_new(writer, schema, options)?;

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ logging = ["tracing-subscriber"]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.4", features = ["full"] }
async-trait = "0.1.41"
clap = "2.33"
clap = { version = "^3", features = ["derive"] }
futures = "0.3"
hex = "0.4"
prost = "0.9"
Expand Down
33 changes: 17 additions & 16 deletions integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,31 @@
// specific language governing permissions and limitations
// under the License.

use std::env;
use arrow::error::Result;
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::StreamWriter;
use clap::Parser;
use std::fs::File;
use std::io::{self, BufReader};

use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write;
#[derive(Debug, Parser)]
#[clap(author, version, about("Read an arrow file and stream to stdout"), long_about = None)]
struct Args {
file_name: String,
}

fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(f, metadata.clone(), None);

let options = write::WriteOptions { compression: None };
let mut writer = write::StreamWriter::new(std::io::stdout(), options);

let fields = metadata.ipc_schema.fields.clone();
let args = Args::parse();
let f = File::open(&args.file_name)?;
let reader = BufReader::new(f);
let mut reader = FileReader::try_new(reader)?;
let schema = reader.schema();

writer.start(&metadata.schema, Some(fields))?;
let mut writer = StreamWriter::try_new(io::stdout(), &schema)?;

reader.try_for_each(|batch| {
let batch = batch?;
writer.write(&batch, None)
writer.write(&batch)
})?;
writer.finish()?;

Expand Down
Loading

0 comments on commit 4db0445

Please sign in to comment.