Skip to content

Commit

Permalink
fix: revert join optimization that actually resulted in a perf regr…
Browse files Browse the repository at this point in the history
…ession per the benchmarks

attempted to minimize allocs and what I thought would be temporary iterators and vectors when using `chain()`.
you're not smarter than the optimizer
Also:
- documented central ValueIndex new() fn
- optimized --ignore-case processing
  • Loading branch information
jqnatividad committed Jan 9, 2025
1 parent 49ec4e2 commit e42af2b
Showing 1 changed file with 52 additions and 44 deletions.
96 changes: 52 additions & 44 deletions src/cmd/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
let mut scratch = csv::ByteRecord::new();
let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.casei, self.nulls)?;
let mut row = csv::ByteRecord::new();
let mut key: Vec<ByteString>;
let mut output = csv::ByteRecord::new();
let mut key;

while self.rdr1.read_byte_record(&mut row)? {
key = get_row_key(&self.sel1, &row, self.casei);
Expand All @@ -238,10 +237,8 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {

validx.idx.read_byte_record(&mut scratch)?;

output.clear();
output.extend(&row);
output.extend(&scratch);
self.wtr.write_record(&output)?;
let combined = row.iter().chain(scratch.iter());
self.wtr.write_record(combined)?;
}
}
}
Expand All @@ -260,8 +257,7 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
let (_, pad2) = self.get_padding()?;
let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.casei, self.nulls)?;
let mut row = csv::ByteRecord::new();
let mut key: Vec<ByteString>;
let mut output = csv::ByteRecord::new();
let mut key;

while self.rdr1.read_byte_record(&mut row)? {
key = get_row_key(&self.sel1, &row, self.casei);
Expand All @@ -270,27 +266,18 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {

for &rowi in rows {
validx.idx.seek(rowi as u64)?;
let row1 = row.iter();
validx.idx.read_byte_record(&mut scratch)?;
output.clear();
if right {
output.extend(&scratch);
output.extend(&row);
self.wtr.write_record(scratch.iter().chain(row1))?;
} else {
output.extend(&row);
output.extend(&scratch);
self.wtr.write_record(row1.chain(&scratch))?;
}
self.wtr.write_record(&output)?;
}
} else if right {
self.wtr.write_record(pad2.iter().chain(&row))?;
} else {
output.clear();
if right {
output.extend(&pad2);
output.extend(&row);
} else {
output.extend(&row);
output.extend(&pad2);
}
self.wtr.write_record(&output)?;
self.wtr.write_record(row.iter().chain(&pad2))?;
}
}
self.wtr.flush()?;
Expand All @@ -301,7 +288,7 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
fn left_join(mut self, anti: bool) -> CliResult<()> {
let validx = ValueIndex::new(self.rdr2, &self.sel2, self.casei, self.nulls)?;
let mut row = csv::ByteRecord::new();
let mut key: Vec<ByteString>;
let mut key;

while self.rdr1.read_byte_record(&mut row)? {
key = get_row_key(&self.sel1, &row, self.casei);
Expand All @@ -324,12 +311,11 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
let mut scratch = csv::ByteRecord::new();
let (pad1, pad2) = self.get_padding()?;
let mut validx = ValueIndex::new(self.rdr2, &self.sel2, self.casei, self.nulls)?;
let mut output = csv::ByteRecord::new();

// Keep track of which rows we've written from rdr2.
let mut rdr2_written: Vec<_> = repeat(false).take(validx.num_rows).collect();
let mut row1 = csv::ByteRecord::new();
let mut key: Vec<ByteString>;
let mut key;

while self.rdr1.read_byte_record(&mut row1)? {
key = get_row_key(&self.sel1, &row1, self.casei);
Expand All @@ -341,16 +327,10 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {

validx.idx.seek(rowi as u64)?;
validx.idx.read_byte_record(&mut scratch)?;
output.clear();
output.extend(&row1);
output.extend(&scratch);
self.wtr.write_record(&output)?;
self.wtr.write_record(row1.iter().chain(&scratch))?;
}
} else {
output.clear();
output.extend(&row1);
output.extend(&pad2);
self.wtr.write_record(&output)?;
self.wtr.write_record(row1.iter().chain(&pad2))?;
}
}

Expand All @@ -360,10 +340,7 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
if !written {
validx.idx.seek(i as u64)?;
validx.idx.read_byte_record(&mut scratch)?;
output.clear();
output.extend(&pad1);
output.extend(&scratch);
self.wtr.write_record(&output)?;
self.wtr.write_record(pad1.iter().chain(&scratch))?;
}
}
self.wtr.flush()?;
Expand All @@ -376,7 +353,6 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
pos.set_byte(0);
let mut row2 = csv::ByteRecord::new();
let mut row1 = csv::ByteRecord::new();
let mut output = csv::ByteRecord::new();
let rdr2_has_headers = self.rdr2.has_headers();
while self.rdr1.read_byte_record(&mut row1)? {
self.rdr2.seek(pos.clone())?;
Expand All @@ -386,10 +362,7 @@ impl<R: io::Read + io::Seek, W: io::Write> IoState<R, W> {
self.rdr2.read_byte_record(&mut row2)?;
}
while self.rdr2.read_byte_record(&mut row2)? {
output.clear();
output.extend(&row1);
output.extend(&row2);
self.wtr.write_record(&output)?;
self.wtr.write_record(row1.iter().chain(&row2))?;
}
}
Ok(self.wtr.flush()?)
Expand Down Expand Up @@ -479,6 +452,31 @@ struct ValueIndex<R> {
}

impl<R: io::Read + io::Seek> ValueIndex<R> {
/// Creates a new ValueIndex by reading a CSV and building indexes for both row positions and values.
///
/// This function reads through a CSV file once to build two indexes:
/// 1. A mapping of selected column values to the row numbers where they appear
/// 2. A byte offset index for random access to rows in the CSV
///
/// # Arguments
///
/// * `rdr` - A CSV reader that implements Read + Seek
/// * `sel` - A Selection that specifies which columns to index
/// * `casei` - If true, values are compared case-insensitively
/// * `nulls` - If true, rows with empty values are included in the index
///
/// # Returns
///
/// Returns a ValueIndex containing:
/// * `values` - HashMap mapping column values to row numbers
/// * `idx` - Indexed CSV reader for random access
/// * `num_rows` - Total number of data rows processed
///
/// # Notes
///
/// - Header rows are included in the byte offset index but not the value index
/// - Values are trimmed and optionally converted to lowercase before indexing
/// - Rows with empty values are skipped unless nulls=true
fn new(
mut rdr: csv::Reader<R>,
sel: &Selection,
Expand Down Expand Up @@ -515,7 +513,17 @@ impl<R: io::Read + io::Seek> ValueIndex<R> {

let fields: Vec<_> = sel
.select(&row)
.map(|v| util::transform(v, casei))
.map(|v| {
if let Ok(s) = simdutf8::basic::from_utf8(v) {
if casei {
s.trim().to_lowercase().into_bytes()
} else {
s.trim().as_bytes().to_vec()
}
} else {
v.to_vec()
}
})
.collect();
if nulls || !fields.iter().any(std::vec::Vec::is_empty) {
match val_idx.entry(fields) {
Expand Down

0 comments on commit e42af2b

Please sign in to comment.