Skip to content

Commit

Permalink
Merge pull request #1360 from jqnatividad/1353-excel-parallelize
Browse files Browse the repository at this point in the history
`excel`: parallelize command
  • Loading branch information
jqnatividad authored Oct 12, 2023
2 parents 7d5bd0f + e3e0a1d commit fc78e7b
Showing 1 changed file with 149 additions and 140 deletions.
289 changes: 149 additions & 140 deletions src/cmd/excel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,33 @@ Excel options:
Note that if a date format is invalid, qsv will fall back and
return the date as if no date-format was specified.
--range <range> An Excel format range, like C:T or C3:T25, to extract to the CSV.
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-d, --delimiter <arg> The delimiter to use when writing CSV data.
Must be a single character. [default: ,]
-p, --progressbar Show progress bar.
-Q, --quiet Do not display export summary message.
"#;

use std::{cmp, fmt::Write, path::PathBuf};

use calamine::{open_workbook_auto, DataType, Range, Reader, SheetType};
use indicatif::HumanCount;
#[cfg(any(feature = "feature_capable", feature = "lite"))]
use indicatif::{ProgressBar, ProgressDrawTarget};
use log::info;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{
config::{Config, Delimiter},
util, CliResult,
};

// number of rows to process in each core/thread
const CHUNK_SIZE: usize = 1_000;

#[derive(Deserialize)]
struct Args {
arg_input: String,
Expand All @@ -119,8 +122,8 @@ struct Args {
flag_delimiter: Option<Delimiter>,
flag_quiet: bool,
flag_date_format: Option<String>,
flag_progressbar: bool,
flag_range: String,
flag_jobs: Option<usize>,
}

#[derive(PartialEq)]
Expand Down Expand Up @@ -525,18 +528,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let (row_count, col_count) = range.get_size();

// prep progress bar
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let show_progress = args.flag_progressbar || util::get_envvar_flag("QSV_PROGRESSBAR");
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let progress = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(5));
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
util::prep_progress(&progress, row_count as u64);
} else {
progress.set_draw_target(ProgressDrawTarget::hidden());
}

if row_count > 0 {
// there are rows to export
let mut rows_iter = range.rows();
Expand Down Expand Up @@ -583,155 +574,173 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
info!("header: {record:?}");
wtr.write_record(&record)?;

#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.inc(1);
}

// process the rest of the rows
// first, amortize allocs by declaring mutable vars we'll use
// in the main processing loop here
let mut cell_date_flag: bool = false;
let mut float_val = 0_f64;
let mut float_flag: bool = false;
let date_format = if let Some(df) = args.flag_date_format {
df
} else {
String::new()
};
let mut work_date;
let mut ryu_buffer = ryu::Buffer::new();
let mut itoa_buffer = itoa::Buffer::new();
let mut formatted_date = String::new();

// main processing loop
let mut rows = Vec::with_capacity(row_count);
let mut processed_rows: Vec<Vec<csv::StringRecord>> = Vec::with_capacity(row_count);

// process rest of the rows
for row in rows_iter {
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.inc(1);
}
record.clear();
for cell in row {
match *cell {
DataType::Empty => record.push_field(""),
DataType::String(ref s) => record.push_field(s),
DataType::Int(ref i) => record.push_field(itoa_buffer.format(*i)),
DataType::Float(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = false;
},
DataType::DateTime(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = true;
},
DataType::Error(ref e) => record.push_field(&format!("{e:?}")),
DataType::Bool(ref b) => record.push_field(&b.to_string()),
DataType::DateTimeIso(ref dt) => record.push_field(&dt.to_string()),
DataType::DurationIso(ref d) => record.push_field(&d.to_string()),
DataType::Duration(ref d) => record.push_field(ryu_buffer.format(*d)),
};
rows.push(row);
}

// Dates are stored as floats in Excel, so if its a float value, we need to check
// if its a date. If its a date, we need to convert it to a string using the
// specified date format. If its not a date, we need to convert it to a string
// using ryu unless its an integer, in which case we use itoa.
#[allow(clippy::cast_precision_loss)]
if float_flag {
if cell_date_flag {
// its a date, so convert it
work_date = if float_val.fract() > 0.0 {
// if it has a fractional part, then its a datetime
if let Some(dt) = cell.as_datetime() {
if date_format.is_empty() {
// no date format specified, so we'll just use the default
// format for the datetime
dt.to_string()
} else {
// a date format was specified, so we'll use it
(formatted_date).clear();
if write!(formatted_date, "{}", dt.format(&date_format)).is_ok()
{
// the format string was ok, so use to_string() to
// actually apply the DelayedFormat
formatted_date.to_string()
// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

rows.par_chunks(CHUNK_SIZE)
.map(|chunk| {
let mut record = csv::StringRecord::with_capacity(500, col_count);
let mut trimmed_record = csv::StringRecord::with_capacity(500, col_count);
let mut cell_date_flag: bool = false;
let mut float_val = 0_f64;
let mut float_flag: bool = false;
let mut work_date;
let mut ryu_buffer = ryu::Buffer::new();
let mut itoa_buffer = itoa::Buffer::new();
let mut formatted_date = String::new();

let mut processed_chunk: Vec<csv::StringRecord> = Vec::with_capacity(CHUNK_SIZE);

for row in chunk {
for cell in *row {
match *cell {
DataType::Empty => record.push_field(""),
DataType::String(ref s) => record.push_field(s),
DataType::Int(ref i) => record.push_field(itoa_buffer.format(*i)),
DataType::Float(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = false;
},
DataType::DateTime(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = true;
},
DataType::Error(ref e) => record.push_field(&format!("{e:?}")),
DataType::Bool(ref b) => record.push_field(&b.to_string()),
DataType::DateTimeIso(ref dt) => record.push_field(&dt.to_string()),
DataType::DurationIso(ref d) => record.push_field(&d.to_string()),
DataType::Duration(ref d) => record.push_field(ryu_buffer.format(*d)),
};

#[allow(clippy::cast_precision_loss)]
if float_flag {
if cell_date_flag {
// its a date, so convert it
work_date = if float_val.fract() > 0.0 {
// if it has a fractional part, then its a datetime
if let Some(dt) = cell.as_datetime() {
if date_format.is_empty() {
// no date format specified, so we'll just use the
// default
// format for the datetime
dt.to_string()
} else {
// a date format was specified, so we'll use it
(formatted_date).clear();
if write!(formatted_date, "{}", dt.format(&date_format))
.is_ok()
{
// the format string was ok, so use to_string()
// to
// actually apply the DelayedFormat
formatted_date.to_string()
} else {
// if there was a format error, revert to the
// default format
dt.to_string()
}
}
} else {
// if there was a format error, revert to the default format
dt.to_string()
format!("ERROR: Cannot convert {float_val} to datetime")
}
}
} else if let Some(d) = cell.as_date() {
// if it has no fractional part and calamine can return it
// as_date
// then its a date
if date_format.is_empty() {
d.to_string()
} else {
formatted_date.clear();
if write!(formatted_date, "{}", d.format(&date_format))
.is_ok()
{
formatted_date.to_string()
} else {
d.to_string()
}
}
} else {
format!("ERROR: Cannot convert {float_val} to date")
};
record.push_field(&work_date);
// its not a date, so just push the ryu-formatted float value if its
// not an integer or the candidate
// integer is too big or too small to be an i64
} else if float_val.fract().abs() > 0.0
|| float_val > i64::MAX as f64
|| float_val < i64::MIN as f64
{
record.push_field(ryu_buffer.format_finite(float_val));
} else {
format!("ERROR: Cannot convert {float_val} to datetime")
// its an i64 integer. We can't use ryu to format it, because it
// will be formatted as a
// float (have a ".0"). So we use itoa.
record.push_field(itoa_buffer.format(float_val as i64));
}
} else if let Some(d) = cell.as_date() {
// if it has no fractional part and calamine can return it as_date
// then its a date
if date_format.is_empty() {
d.to_string()
// reset the float flag
float_flag = false;
}
}

if trim {
record.trim();
record.iter().for_each(|field| {
if field.contains('\n') {
trimmed_record.push_field(&field.to_string().replace('\n', " "));
} else {
formatted_date.clear();
if write!(formatted_date, "{}", d.format(&date_format)).is_ok() {
formatted_date.to_string()
} else {
d.to_string()
}
trimmed_record.push_field(field);
}
} else {
format!("ERROR: Cannot convert {float_val} to date")
};
record.push_field(&work_date);
// its not a date, so just push the ryu-formatted float value if its not an
// integer or the candidate integer is too big or too small to be an i64
} else if float_val.fract().abs() > 0.0
|| float_val > i64::MAX as f64
|| float_val < i64::MIN as f64
{
record.push_field(ryu_buffer.format_finite(float_val));
} else {
// its an i64 integer. We can't use ryu to format it, because it will
// be formatted as a float (have a ".0"). So we use itoa.
record.push_field(itoa_buffer.format(float_val as i64));
});
record.clone_from(&trimmed_record);
trimmed_record.clear();
}
// reset the float flag
float_flag = false;
}
}

if trim {
record.trim();
trimmed_record.clear();
record.iter().for_each(|field| {
if field.contains('\n') {
trimmed_record.push_field(&field.to_string().replace('\n', " "));
} else {
trimmed_record.push_field(field);
}
});
record.clone_from(&trimmed_record);
processed_chunk.push(record.clone());
record.clear();
}
processed_chunk
})
.collect_into_vec(&mut processed_rows);

// rayon collect() guarantees original order,
// so we can just write results for each chunk in order
for processed_chunk in processed_rows {
for processed_row in processed_chunk {
wtr.write_record(&processed_row)?;
}
wtr.write_record(&record)?;
} // end of main processing loop
}
} else {
return fail_clierror!("\"{sheet}\" sheet is empty.");
}

wtr.flush()?;

let end_msg = format!(
"{} {}-column rows exported from \"{sheet}\" sheet",
HumanCount(row_count.saturating_sub(1) as u64),
HumanCount(col_count as u64),
);

#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.set_message(end_msg.clone());
util::finish_progress(&progress);
}

if !args.flag_quiet {
winfo!("{end_msg}");
winfo!(
"{}",
format!(
"{} {}-column rows exported from \"{sheet}\" sheet",
HumanCount(row_count.saturating_sub(1) as u64),
HumanCount(col_count as u64),
)
);
}

Ok(())
Expand Down

0 comments on commit fc78e7b

Please sign in to comment.