Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Speedup ndjson reader ~40% #18197

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ regex = { workspace = true }
reqwest = { workspace = true, optional = true }
ryu = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true }
serde_json = { version = "1", default-features = false, features = ["alloc"], optional = true }
simd-json = { workspace = true, optional = true }
simdutf8 = { workspace = true, optional = true }
smartstring = { workspace = true }
Expand All @@ -63,7 +63,6 @@ json = [
"polars-json",
"simd-json",
"atoi_simd",
"serde_json",
"dtype-struct",
"csv",
]
Expand Down
87 changes: 42 additions & 45 deletions crates/polars-io/src/ndjson/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowIndex;
const NEWLINE: u8 = b'\n';
const RETURN: u8 = b'\r';
const CLOSING_BRACKET: u8 = b'}';

#[must_use]
Expand Down Expand Up @@ -259,8 +258,7 @@ impl<'a> CoreJsonReader<'a> {

let iter = file_chunks.par_iter().map(|(start_pos, stop_at_nbytes)| {
let bytes = &bytes[*start_pos..*stop_at_nbytes];
let iter = serde_json::Deserializer::from_slice(bytes)
.into_iter::<Box<serde_json::value::RawValue>>();
let iter = json_lines(bytes);
iter.count()
});
Ok(POOL.install(|| iter.sum()))
Expand Down Expand Up @@ -366,56 +364,55 @@ impl<'a> CoreJsonReader<'a> {
fn parse_impl(
bytes: &[u8],
buffers: &mut PlIndexMap<BufferKey, Buffer>,
scratch: &mut Vec<u8>,
scratch: &mut Scratch,
) -> PolarsResult<usize> {
scratch.clear();
scratch.extend_from_slice(bytes);
let n = scratch.len();
let all_good = match n {
0 => true,
1 => scratch[0] == NEWLINE,
2 => scratch[0] == NEWLINE && scratch[1] == RETURN,
scratch.json.clear();
scratch.json.extend_from_slice(bytes);
let n = scratch.json.len();
let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers)
.map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
match value {
simd_json::BorrowedValue::Object(value) => {
buffers.iter_mut().try_for_each(|(s, inner)| {
match s.0.map_lookup(&value) {
Some(v) => inner.add(v)?,
None => inner.add_null(),
}
PolarsResult::Ok(())
})?;
},
_ => {
let value: simd_json::BorrowedValue = simd_json::to_borrowed_value(scratch)
.map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
match value {
simd_json::BorrowedValue::Object(value) => {
buffers.iter_mut().try_for_each(|(s, inner)| {
match s.0.map_lookup(&value) {
Some(v) => inner.add(v)?,
None => inner.add_null(),
}
PolarsResult::Ok(())
})?;
},
_ => {
buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
},
};
true
buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
},
};
polars_ensure!(all_good, ComputeError: "invalid JSON: unexpected end of file");
Ok(n)
}

#[derive(Default)]
struct Scratch {
json: Vec<u8>,
buffers: simd_json::Buffers,
}

fn json_lines(bytes: &[u8]) -> impl Iterator<Item = &[u8]> {
// This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them.
// However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all
// things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more
// ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know
// our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost.
bytes.split(|&byte| byte == b'\n').filter(|&bytes| {
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
bytes
.iter()
.any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r'))
})
}

fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> PolarsResult<()> {
let mut buf = vec![];

// The `RawValue` is a pointer to the original JSON string and does not perform any deserialization.
// It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing.
let iter =
serde_json::Deserializer::from_slice(bytes).into_iter::<Box<serde_json::value::RawValue>>();
for value_result in iter {
match value_result {
Ok(value) => {
let bytes = value.get().as_bytes();
parse_impl(bytes, buffers, &mut buf)?;
},
Err(e) => {
polars_bail!(ComputeError: "error parsing ndjson {}", e)
},
}
let mut scratch = Scratch::default();

let iter = json_lines(bytes);
for bytes in iter {
parse_impl(bytes, buffers, &mut scratch)?;
}
Ok(())
}
Expand Down
Loading