From 500ccd71758c0da68855c5ad6bf56041ee80705e Mon Sep 17 00:00:00 2001 From: Chayim Refael Friedman Date: Mon, 15 Apr 2024 17:25:33 +0300 Subject: [PATCH 1/3] Reuse the `simd_json::Buffers` for NDJSON parsing This simple change speeds up NDJSON reading by 30%. --- crates/polars-io/src/ndjson/core.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index 2beb1f09d88b..7a58f4b6b80c 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -366,18 +366,19 @@ impl<'a> CoreJsonReader<'a> { fn parse_impl( bytes: &[u8], buffers: &mut PlIndexMap, - scratch: &mut Vec, + scratch: &mut Scratch, ) -> PolarsResult { - scratch.clear(); - scratch.extend_from_slice(bytes); - let n = scratch.len(); + scratch.json.clear(); + scratch.json.extend_from_slice(bytes); + let n = scratch.json.len(); let all_good = match n { 0 => true, - 1 => scratch[0] == NEWLINE, - 2 => scratch[0] == NEWLINE && scratch[1] == RETURN, + 1 => scratch.json[0] == NEWLINE, + 2 => scratch.json[0] == NEWLINE && scratch.json[1] == RETURN, _ => { - let value: simd_json::BorrowedValue = simd_json::to_borrowed_value(scratch) - .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?; + let value = + simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers) + .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?; match value { simd_json::BorrowedValue::Object(value) => { buffers.iter_mut().try_for_each(|(s, inner)| { @@ -399,8 +400,14 @@ fn parse_impl( Ok(n) } +#[derive(Default)] +struct Scratch { + json: Vec, + buffers: simd_json::Buffers, +} + fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap) -> PolarsResult<()> { - let mut buf = vec![]; + let mut scratch = Scratch::default(); // The `RawValue` is a pointer to the original JSON string and does not perform any deserialization. // It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing. @@ -410,7 +417,7 @@ fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap) -> Pol match value_result { Ok(value) => { let bytes = value.get().as_bytes(); - parse_impl(bytes, buffers, &mut buf)?; + parse_impl(bytes, buffers, &mut scratch)?; }, Err(e) => { polars_bail!(ComputeError: "error parsing ndjson {}", e) From 79fe29a9fc7451dc8119fb269c7f0acfa50c8d24 Mon Sep 17 00:00:00 2001 From: Chayim Refael Friedman Date: Mon, 15 Apr 2024 21:01:12 +0300 Subject: [PATCH 2/3] Avoid going through `serde_json` while deserializing Previously we use it to delimit the values. While convenience, it was not efficient (see the comment in the code). This gives a 20% speedup. This *could* break people's code since we will not split correctly (and thus error) if one object spans two lines or two objects are in the same line. However, such code was already broken, since NDJSON is not allowed to contain any line breaks. If this is a concern, it is possible (at some perf degradation) to check for `}\n` instead of `\n` alone, and that will make this basically equivalent to the splitting logic we have for threads. As a nice bonus, this allows us to avoid a dependency on `serde_json` for JSON parsing (although we still use it for other things). The original PR that introduced this usage of `serde_json` was #5427. It was done because newline handling wasn't correct. However, as I said above, it is very simple: newlines are not allowed everywhere except between values. And even if we decide we want to handle non-spec-compliant NDJSON, we still don't handle it properly as we can break thread chunks in the middle of a string. The abovementioned PR also said this had massive perf gains. However, I cannot reproduce that. I've checked out the repo at this time, and this PR was a definite regression. It is also expected, given that `serde_json::StreamDeserializer` does a lot of additional work, and it also shows up in profiles. It was probably benchmarked incorrectly (maybe with a debug build?). --- crates/polars-io/Cargo.toml | 3 +-- crates/polars-io/src/ndjson/core.rs | 33 +++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index aa2dc674f7a9..da14d8dcc40b 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -40,7 +40,7 @@ regex = { workspace = true } reqwest = { workspace = true, optional = true } ryu = { workspace = true, optional = true } serde = { workspace = true, features = ["rc"], optional = true } -serde_json = { version = "1", default-features = false, features = ["alloc", "raw_value", "std"], optional = true } +serde_json = { version = "1", default-features = false, features = ["alloc"], optional = true } simd-json = { workspace = true, optional = true } simdutf8 = { workspace = true, optional = true } smartstring = { workspace = true } @@ -63,7 +63,6 @@ json = [ "polars-json", "simd-json", "atoi_simd", - "serde_json", "dtype-struct", "csv", ] diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index 7a58f4b6b80c..167d016dd3ca 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -259,8 +259,7 @@ impl<'a> CoreJsonReader<'a> { let iter = file_chunks.par_iter().map(|(start_pos, stop_at_nbytes)| { let bytes = &bytes[*start_pos..*stop_at_nbytes]; - let iter = serde_json::Deserializer::from_slice(bytes) - .into_iter::>(); + let iter = json_lines(bytes); iter.count() }); Ok(POOL.install(|| iter.sum())) @@ -406,23 +405,25 @@ struct Scratch { buffers: simd_json::Buffers, } +fn json_lines(bytes: &[u8]) -> impl Iterator { + // This previously used `serde_json`'s `RawValue` to deserialize chunks without really deserializing them. + // However, this convenience comes at a cost. serde_json allocates and parses and does UTF-8 validation, all + // things we don't need since we use simd_json for them. Also, `serde_json::StreamDeserializer` has a more + // ambitious goal: it wants to parse potentially *non-delimited* sequences of JSON values, while we know + // our values are line-delimited. Turns out, custom splitting is very easy, and gives a very nice performance boost. + bytes.split(|&byte| byte == b'\n').filter(|&bytes| { + bytes + .iter() + .any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r')) + }) +} + fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap) -> PolarsResult<()> { let mut scratch = Scratch::default(); - // The `RawValue` is a pointer to the original JSON string and does not perform any deserialization. - // It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing. - let iter = - serde_json::Deserializer::from_slice(bytes).into_iter::>(); - for value_result in iter { - match value_result { - Ok(value) => { - let bytes = value.get().as_bytes(); - parse_impl(bytes, buffers, &mut scratch)?; - }, - Err(e) => { - polars_bail!(ComputeError: "error parsing ndjson {}", e) - }, - } + let iter = json_lines(bytes); + for bytes in iter { + parse_impl(bytes, buffers, &mut scratch)?; } Ok(()) } From ecf762d6ee3b55b16f2b5dd69efc30f32620ebd1 Mon Sep 17 00:00:00 2001 From: Chayim Refael Friedman Date: Thu, 25 Apr 2024 10:33:43 +0300 Subject: [PATCH 3/3] Remove seemingly unnecessary code This code errors for invalid JSON. But simd_json will already error (and we'll propagate that) for invalid JSON, so I see no reason for that. In addition, a side-effect of that code is that it will also reject some valid JSON: the empty object (`{}`). An empty dataframe seems non-useful, but I see no reason to *forbid* it. Also, the empty object may appear in a non-empty dataframe, to signal an all-null row. As a nice side benefit, this also improves perf by 3.5%, but that could be just noise. --- crates/polars-io/src/ndjson/core.rs | 37 ++++++++++------------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index 167d016dd3ca..3390d1004b9d 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -16,7 +16,6 @@ use crate::predicates::PhysicalIoExpr; use crate::prelude::*; use crate::RowIndex; const NEWLINE: u8 = b'\n'; -const RETURN: u8 = b'\r'; const CLOSING_BRACKET: u8 = b'}'; #[must_use] @@ -370,32 +369,22 @@ fn parse_impl( scratch.json.clear(); scratch.json.extend_from_slice(bytes); let n = scratch.json.len(); - let all_good = match n { - 0 => true, - 1 => scratch.json[0] == NEWLINE, - 2 => scratch.json[0] == NEWLINE && scratch.json[1] == RETURN, + let value = simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers) + .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?; + match value { + simd_json::BorrowedValue::Object(value) => { + buffers.iter_mut().try_for_each(|(s, inner)| { + match s.0.map_lookup(&value) { + Some(v) => inner.add(v)?, + None => inner.add_null(), + } + PolarsResult::Ok(()) + })?; + }, _ => { - let value = - simd_json::to_borrowed_value_with_buffers(&mut scratch.json, &mut scratch.buffers) - .map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?; - match value { - simd_json::BorrowedValue::Object(value) => { - buffers.iter_mut().try_for_each(|(s, inner)| { - match s.0.map_lookup(&value) { - Some(v) => inner.add(v)?, - None => inner.add_null(), - } - PolarsResult::Ok(()) - })?; - }, - _ => { - buffers.iter_mut().for_each(|(_, inner)| inner.add_null()); - }, - }; - true + buffers.iter_mut().for_each(|(_, inner)| inner.add_null()); }, }; - polars_ensure!(all_good, ComputeError: "invalid JSON: unexpected end of file"); Ok(n) }