Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

excel: parallelize command #1360

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 149 additions & 140 deletions src/cmd/excel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,33 @@ Excel options:
Note that if a date format is invalid, qsv will fall back and
return the date as if no date-format was specified.
--range <range> An Excel format range, like C:T or C3:T25, to extract to the CSV.
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.

Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-d, --delimiter <arg> The delimiter to use when writing CSV data.
Must be a single character. [default: ,]
-p, --progressbar Show progress bar.
-Q, --quiet Do not display export summary message.
"#;

use std::{cmp, fmt::Write, path::PathBuf};

use calamine::{open_workbook_auto, DataType, Range, Reader, SheetType};
use indicatif::HumanCount;
#[cfg(any(feature = "feature_capable", feature = "lite"))]
use indicatif::{ProgressBar, ProgressDrawTarget};
use log::info;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};

use crate::{
config::{Config, Delimiter},
util, CliResult,
};

// number of rows to process in each core/thread
const CHUNK_SIZE: usize = 1_000;

#[derive(Deserialize)]
struct Args {
arg_input: String,
Expand All @@ -119,8 +122,8 @@ struct Args {
flag_delimiter: Option<Delimiter>,
flag_quiet: bool,
flag_date_format: Option<String>,
flag_progressbar: bool,
flag_range: String,
flag_jobs: Option<usize>,
}

#[derive(PartialEq)]
Expand Down Expand Up @@ -525,18 +528,6 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let (row_count, col_count) = range.get_size();

// prep progress bar
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let show_progress = args.flag_progressbar || util::get_envvar_flag("QSV_PROGRESSBAR");
#[cfg(any(feature = "feature_capable", feature = "lite"))]
let progress = ProgressBar::with_draw_target(None, ProgressDrawTarget::stderr_with_hz(5));
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
util::prep_progress(&progress, row_count as u64);
} else {
progress.set_draw_target(ProgressDrawTarget::hidden());
}

if row_count > 0 {
// there are rows to export
let mut rows_iter = range.rows();
Expand Down Expand Up @@ -583,155 +574,173 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
info!("header: {record:?}");
wtr.write_record(&record)?;

#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.inc(1);
}

// process the rest of the rows
// first, amortize allocs by declaring mutable vars we'll use
// in the main processing loop here
let mut cell_date_flag: bool = false;
let mut float_val = 0_f64;
let mut float_flag: bool = false;
let date_format = if let Some(df) = args.flag_date_format {
df
} else {
String::new()
};
let mut work_date;
let mut ryu_buffer = ryu::Buffer::new();
let mut itoa_buffer = itoa::Buffer::new();
let mut formatted_date = String::new();

// main processing loop
let mut rows = Vec::with_capacity(row_count);
let mut processed_rows: Vec<Vec<csv::StringRecord>> = Vec::with_capacity(row_count);

// process rest of the rows
for row in rows_iter {
#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.inc(1);
}
record.clear();
for cell in row {
match *cell {
DataType::Empty => record.push_field(""),
DataType::String(ref s) => record.push_field(s),
DataType::Int(ref i) => record.push_field(itoa_buffer.format(*i)),
DataType::Float(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = false;
},
DataType::DateTime(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = true;
},
DataType::Error(ref e) => record.push_field(&format!("{e:?}")),
DataType::Bool(ref b) => record.push_field(&b.to_string()),
DataType::DateTimeIso(ref dt) => record.push_field(&dt.to_string()),
DataType::DurationIso(ref d) => record.push_field(&d.to_string()),
DataType::Duration(ref d) => record.push_field(ryu_buffer.format(*d)),
};
rows.push(row);
}

// Dates are stored as floats in Excel, so if its a float value, we need to check
// if its a date. If its a date, we need to convert it to a string using the
// specified date format. If its not a date, we need to convert it to a string
// using ryu unless its an integer, in which case we use itoa.
#[allow(clippy::cast_precision_loss)]
if float_flag {
if cell_date_flag {
// its a date, so convert it
work_date = if float_val.fract() > 0.0 {
// if it has a fractional part, then its a datetime
if let Some(dt) = cell.as_datetime() {
if date_format.is_empty() {
// no date format specified, so we'll just use the default
// format for the datetime
dt.to_string()
} else {
// a date format was specified, so we'll use it
(formatted_date).clear();
if write!(formatted_date, "{}", dt.format(&date_format)).is_ok()
{
// the format string was ok, so use to_string() to
// actually apply the DelayedFormat
formatted_date.to_string()
// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

rows.par_chunks(CHUNK_SIZE)
.map(|chunk| {
let mut record = csv::StringRecord::with_capacity(500, col_count);
let mut trimmed_record = csv::StringRecord::with_capacity(500, col_count);
let mut cell_date_flag: bool = false;
let mut float_val = 0_f64;
let mut float_flag: bool = false;
let mut work_date;
let mut ryu_buffer = ryu::Buffer::new();
let mut itoa_buffer = itoa::Buffer::new();
let mut formatted_date = String::new();

let mut processed_chunk: Vec<csv::StringRecord> = Vec::with_capacity(CHUNK_SIZE);

for row in chunk {
for cell in *row {
match *cell {
DataType::Empty => record.push_field(""),
DataType::String(ref s) => record.push_field(s),
DataType::Int(ref i) => record.push_field(itoa_buffer.format(*i)),
DataType::Float(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = false;
},
DataType::DateTime(ref f) => {
float_val = *f;
float_flag = true;
cell_date_flag = true;
},
DataType::Error(ref e) => record.push_field(&format!("{e:?}")),
DataType::Bool(ref b) => record.push_field(&b.to_string()),
DataType::DateTimeIso(ref dt) => record.push_field(&dt.to_string()),
DataType::DurationIso(ref d) => record.push_field(&d.to_string()),
DataType::Duration(ref d) => record.push_field(ryu_buffer.format(*d)),
};

#[allow(clippy::cast_precision_loss)]
if float_flag {
if cell_date_flag {
// its a date, so convert it
work_date = if float_val.fract() > 0.0 {
// if it has a fractional part, then its a datetime
if let Some(dt) = cell.as_datetime() {
if date_format.is_empty() {
// no date format specified, so we'll just use the
// default
// format for the datetime
dt.to_string()
} else {
// a date format was specified, so we'll use it
(formatted_date).clear();
if write!(formatted_date, "{}", dt.format(&date_format))
.is_ok()
{
// the format string was ok, so use to_string()
// to
// actually apply the DelayedFormat
formatted_date.to_string()
} else {
// if there was a format error, revert to the
// default format
dt.to_string()
}
}
} else {
// if there was a format error, revert to the default format
dt.to_string()
format!("ERROR: Cannot convert {float_val} to datetime")
}
}
} else if let Some(d) = cell.as_date() {
// if it has no fractional part and calamine can return it
// as_date
// then its a date
if date_format.is_empty() {
d.to_string()
} else {
formatted_date.clear();
if write!(formatted_date, "{}", d.format(&date_format))
.is_ok()
{
formatted_date.to_string()
} else {
d.to_string()
}
}
} else {
format!("ERROR: Cannot convert {float_val} to date")
};
record.push_field(&work_date);
// its not a date, so just push the ryu-formatted float value if its
// not an integer or the candidate
// integer is too big or too small to be an i64
} else if float_val.fract().abs() > 0.0
|| float_val > i64::MAX as f64
|| float_val < i64::MIN as f64
{
record.push_field(ryu_buffer.format_finite(float_val));
} else {
format!("ERROR: Cannot convert {float_val} to datetime")
// its an i64 integer. We can't use ryu to format it, because it
// will be formatted as a
// float (have a ".0"). So we use itoa.
record.push_field(itoa_buffer.format(float_val as i64));
}
} else if let Some(d) = cell.as_date() {
// if it has no fractional part and calamine can return it as_date
// then its a date
if date_format.is_empty() {
d.to_string()
// reset the float flag
float_flag = false;
}
}

if trim {
record.trim();
record.iter().for_each(|field| {
if field.contains('\n') {
trimmed_record.push_field(&field.to_string().replace('\n', " "));
} else {
formatted_date.clear();
if write!(formatted_date, "{}", d.format(&date_format)).is_ok() {
formatted_date.to_string()
} else {
d.to_string()
}
trimmed_record.push_field(field);
}
} else {
format!("ERROR: Cannot convert {float_val} to date")
};
record.push_field(&work_date);
// its not a date, so just push the ryu-formatted float value if its not an
// integer or the candidate integer is too big or too small to be an i64
} else if float_val.fract().abs() > 0.0
|| float_val > i64::MAX as f64
|| float_val < i64::MIN as f64
{
record.push_field(ryu_buffer.format_finite(float_val));
} else {
// its an i64 integer. We can't use ryu to format it, because it will
// be formatted as a float (have a ".0"). So we use itoa.
record.push_field(itoa_buffer.format(float_val as i64));
});
record.clone_from(&trimmed_record);
trimmed_record.clear();
}
// reset the float flag
float_flag = false;
}
}

if trim {
record.trim();
trimmed_record.clear();
record.iter().for_each(|field| {
if field.contains('\n') {
trimmed_record.push_field(&field.to_string().replace('\n', " "));
} else {
trimmed_record.push_field(field);
}
});
record.clone_from(&trimmed_record);
processed_chunk.push(record.clone());
record.clear();
}
processed_chunk
})
.collect_into_vec(&mut processed_rows);

// rayon collect() guarantees original order,
// so we can just write results for each chunk in order
for processed_chunk in processed_rows {
for processed_row in processed_chunk {
wtr.write_record(&processed_row)?;
}
wtr.write_record(&record)?;
} // end of main processing loop
}
} else {
return fail_clierror!("\"{sheet}\" sheet is empty.");
}

wtr.flush()?;

let end_msg = format!(
"{} {}-column rows exported from \"{sheet}\" sheet",
HumanCount(row_count.saturating_sub(1) as u64),
HumanCount(col_count as u64),
);

#[cfg(any(feature = "feature_capable", feature = "lite"))]
if show_progress {
progress.set_message(end_msg.clone());
util::finish_progress(&progress);
}

if !args.flag_quiet {
winfo!("{end_msg}");
winfo!(
"{}",
format!(
"{} {}-column rows exported from \"{sheet}\" sheet",
HumanCount(row_count.saturating_sub(1) as u64),
HumanCount(col_count as u64),
)
);
}

Ok(())
Expand Down
Loading