Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust, python): properly handle json with unclosed strings #5427

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "IO related logic for the Polars DataFrame library"

[features]
# support for arrows json parsing
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "csv-core"]
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "csv-core", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression", "memmap"]
# support for arrows streaming ipc file parsing
Expand Down Expand Up @@ -56,8 +56,8 @@ polars-utils = { version = "0.25.1", path = "../polars-utils" }
rayon.workspace = true
regex = "1.6"
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc"] }
simd-json = { version = "0.6.0", optional = true, features = ["allow-non-simd", "known-key"] }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc", "raw_value"] }
simd-json = { version = "0.7.0", optional = true, features = ["allow-non-simd", "known-key"] }
simdutf8 = "0.1"

[dev-dependencies]
Expand Down
128 changes: 96 additions & 32 deletions polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ use std::path::PathBuf;

pub use arrow::array::StructArray;
pub use arrow::io::ndjson as arrow_ndjson;
use num::traits::Pow;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
use rayon::prelude::*;

use crate::csv::parser::*;
use crate::csv::utils::*;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::ndjson_core::buffer::*;
use crate::prelude::*;
const QUOTE_CHAR: u8 = b'"';
const SEP: u8 = b',';
const NEWLINE: u8 = b'\n';
const RETURN: u8 = b'\r';
const CLOSING_BRACKET: u8 = b'}';

#[must_use]
pub struct JsonLineReader<'a, R>
Expand Down Expand Up @@ -180,14 +179,7 @@ impl<'a> CoreJsonReader<'a> {
let mut bytes = bytes;
let mut total_rows = 128;

if let Some((mean, std)) = get_line_stats(
bytes,
self.sample_size,
NEWLINE,
self.schema.len(),
SEP,
None,
) {
if let Some((mean, std)) = get_line_stats_json(bytes, self.sample_size) {
let line_length_upper_bound = mean + 1.1 * std;

total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
Expand All @@ -197,7 +189,7 @@ impl<'a> CoreJsonReader<'a> {
let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;

if n_bytes < bytes.len() {
if let Some(pos) = next_line_position_naive(&bytes[n_bytes..], NEWLINE) {
if let Some(pos) = next_line_position_naive_json(&bytes[n_bytes..]) {
bytes = &bytes[..n_bytes + pos]
}
}
Expand All @@ -217,16 +209,7 @@ impl<'a> CoreJsonReader<'a> {
std::cmp::min(rows_per_thread, max_proxy)
};

let expected_fields = &self.schema.len();

let file_chunks = get_file_chunks(
bytes,
n_threads,
*expected_fields + 1,
SEP,
Some(QUOTE_CHAR),
NEWLINE,
);
let file_chunks = get_file_chunks_json(bytes, n_threads);
let dfs = POOL.install(|| {
file_chunks
.into_par_iter()
Expand Down Expand Up @@ -324,16 +307,14 @@ fn parse_lines<'a>(

let total_bytes = bytes.len();
let mut offset = 0;
for line in SplitLines::new(bytes, QUOTE_CHAR, NEWLINE) {
offset += 1; // the newline
offset += parse_impl(line, buffers, &mut buf)?;
}

// if file doesn't end with a newline, parse the last line
if offset < total_bytes {
let rem = &bytes[offset..];
offset += rem.len();
parse_impl(rem, buffers, &mut buf)?;
// The `RawValue` is a pointer to the original JSON string and does not perform any deserialization.
// It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing.
let mut iter =
serde_json::Deserializer::from_slice(bytes).into_iter::<Box<serde_json::value::RawValue>>();
while let Some(Ok(value)) = iter.next() {
let bytes = value.get().as_bytes();
offset += bytes.len();
parse_impl(bytes, buffers, &mut buf)?;
}

if offset != total_bytes {
Expand All @@ -344,3 +325,86 @@ fn parse_lines<'a>(

Ok(())
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
/// This just looks for `}\n`
pub(crate) fn next_line_position_naive_json(input: &[u8]) -> Option<usize> {
let pos = memchr::memchr(NEWLINE, input)?;
if pos == 0 {
return Some(1);
}

let is_closing_bracket = input.get(pos - 1) == Some(&CLOSING_BRACKET);
if is_closing_bracket {
Some(pos + 1)
} else {
None
}
}

/// Get the mean and standard deviation of length of lines in bytes
pub(crate) fn get_line_stats_json(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
let mut lengths = Vec::with_capacity(n_lines);

let mut bytes_trunc;
let n_lines_per_iter = n_lines / 2;

let mut n_read = 0;

// sample from start and 75% in the file
for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
bytes_trunc = &bytes[offset..];
let pos = next_line_position_naive_json(bytes_trunc)?;
bytes_trunc = &bytes_trunc[pos + 1..];

for _ in offset..(offset + n_lines_per_iter) {
let pos = next_line_position_naive_json(bytes_trunc);
if let Some(pos) = pos {
lengths.push(pos);
let next_bytes = &bytes_trunc[pos..];
if next_bytes.is_empty() {
return None;
}
bytes_trunc = next_bytes;
n_read += pos;
} else {
break;
}
}
}

let n_samples = lengths.len();
let mean = (n_read as f32) / (n_samples as f32);
let mut std = 0.0;
for &len in lengths.iter() {
std += (len as f32 - mean).pow(2.0)
}
std = (std / n_samples as f32).sqrt();
Some((mean, std))
}

pub(crate) fn get_file_chunks_json(bytes: &[u8], n_threads: usize) -> Vec<(usize, usize)> {
let mut last_pos = 0;
let total_len = bytes.len();
let chunk_size = total_len / n_threads;
let mut offsets = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let search_pos = last_pos + chunk_size;

if search_pos >= bytes.len() {
break;
}

let end_pos = match next_line_position_naive_json(&bytes[search_pos..]) {
Some(pos) => search_pos + pos,
None => {
break;
}
};
offsets.push((last_pos, end_pos));
last_pos = end_pos;
}
offsets.push((last_pos, total_len));
offsets
}
25 changes: 25 additions & 0 deletions polars/tests/it/io/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ fn read_json_with_whitespace() {
assert_eq!("d", df.get_columns()[3].name());
assert_eq!((12, 4), df.shape());
}
#[test]
fn read_json_with_escapes() {
let escaped_json = r#"{"id": 1, "text": "\""}
{"text": "\n{\n\t\t\"inner\": \"json\n}\n", "id": 10}
{"id": 0, "text":"\"","date":"2013-08-03 15:17:23"}
{"id": 1, "text":"\"123\"","date":"2009-05-19 21:07:53"}
{"id": 2, "text":"/....","date":"2009-05-19 21:07:53"}
{"id": 3, "text":"\n\n..","date":"2"}
{"id": 4, "text":"\"'/\n...","date":"2009-05-19 21:07:53"}
{"id": 5, "text":".h\"h1hh\\21hi1e2emm...","date":"2009-05-19 21:07:53"}
{"id": 6, "text":"xxxx....","date":"2009-05-19 21:07:53"}
{"id": 7, "text":".\"quoted text\".","date":"2009-05-19 21:07:53"}

"#;
let file = Cursor::new(escaped_json);
let df = JsonLineReader::new(file)
.infer_schema_len(Some(6))
.finish()
.unwrap();
assert_eq!("id", df.get_columns()[0].name());
assert_eq!(AnyValue::Utf8("\""), df.column("text").unwrap().get(0));
assert_eq!("text", df.get_columns()[1].name());
assert_eq!((10, 3), df.shape());
}

#[test]
fn read_unordered_json() {
let unordered_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
Expand Down
Loading