Skip to content

Commit

Permalink
Merge pull request #2370 from dathere/2369-joinp_cache-schema
Browse files Browse the repository at this point in the history
`joinp`: refactor `--cache=schema` option
  • Loading branch information
jqnatividad authored Dec 23, 2024
2 parents 4e0cb88 + 0b065b7 commit 5964047
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 51 deletions.
171 changes: 121 additions & 50 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,22 @@ joinp options:
--infer-len <arg> The number of rows to scan when inferring the schema of the CSV.
Set to 0 to do a full table scan (warning: very slow).
[default: 10000]
--cache-schema Create and cache Polars schema JSON files.
If specified and the schema file/s do not exist, it will check if a
stats cache is available. If so, it will use it to derive a Polars schema
and save it. If there's no stats cache, it will infer the schema
using --infer-len and save the inferred schemas.
Each schema file will have the same file stem as the corresponding
input file, with the extension ".pschema.json"
(data.csv's Polars schema file will be data.pschema.json)
If the file/s exists, it will load the schema instead of inferring it
(ignoring --infer-len) and attempt to use it for each corresponding
Polars "table" with the same file stem.
--cache-schema <arg> Create and cache Polars schema JSON files.
‎ -2: treat all columns as String. A Polars schema file is created & cached.
‎ -1: treat all columns as String. No Polars schema file is created.
0: do not cache Polars schema.
1: cache Polars schema
If set to 1 and the schema file/s do not exist, it will check if a
stats cache is available. If so, it will use it to derive a Polars
schema and save it. If there's no stats cache, it will infer the
schema using --infer-len and save the inferred schemas.
Each schema file will have the same file stem as the corresponding
input file, with the extension ".pschema.json"
(data.csv's Polars schema file will be data.pschema.json)
If the file/s exists, it will load the schema instead of inferring it
(ignoring --infer-len) and attempt to use it for each corresponding
Polars "table" with the same file stem.
[default: 0]
--low-memory Use low memory mode when parsing CSVs. This will use less memory
but will be slower. It will also process the join in streaming mode.
Only use this when you get out of memory errors.
Expand Down Expand Up @@ -238,7 +243,7 @@ struct Args {
flag_try_parsedates: bool,
flag_decimal_comma: bool,
flag_infer_len: usize,
flag_cache_schema: bool,
flag_cache_schema: i8,
flag_low_memory: bool,
flag_no_optimizations: bool,
flag_ignore_errors: bool,
Expand Down Expand Up @@ -346,7 +351,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
Some("backward") | None => AsofStrategy::Backward,
Some("forward") => AsofStrategy::Forward,
Some("nearest") => AsofStrategy::Nearest,
Some(s) => return fail_clierror!("Invalid asof strategy: {}", s),
Some(s) => return fail_incorrectusage_clierror!("Invalid asof strategy: {}", s),
};

let mut asof_options = AsOfOptions {
Expand Down Expand Up @@ -634,8 +639,6 @@ impl JoinStruct {

impl Args {
fn new_join(&mut self, tmpdir: &tempfile::TempDir) -> CliResult<JoinStruct> {
// =============== NEW_JOIN HELPER FUNCTIONS =================

// Helper function to create a LazyFrameReader with common settings
fn create_lazy_reader(
file_path: &str,
Expand Down Expand Up @@ -714,8 +717,31 @@ impl Args {
Ok(schema)
}

// Helper function to setup a LazyFrame with schema handling
#[inline]
/// Helper function to setup a LazyFrame with schema handling based on cache_schema flag.
///
/// # Arguments
/// * `input_path` - Path to the input CSV file
/// * `comment_char` - Optional comment character to ignore lines starting with it
/// * `args` - Command line arguments containing schema caching and other options
/// * `delim` - Delimiter character for CSV parsing
/// * `debuglog_flag` - Whether debug logging is enabled
///
/// # Returns
/// Returns a tuple containing:
/// * The configured LazyFrame for reading the CSV
/// * A boolean indicating if a new schema needs to be created and cached
///
/// # Schema Caching Modes
/// * `0` - No schema caching, infer schema from data sample using Polars
/// * `1` - Cache inferred schema from stats in .pschema.json file
/// * `-1` - Use string schema for all columns without caching
/// * `-2` - Use string schema for all columns and cache it
///
/// # Errors
/// Returns error if:
/// * File operations fail
/// * Schema parsing fails
/// * Invalid cache_schema value provided
fn setup_lazy_frame(
input_path: &Path,
comment_char: Option<&PlSmallStr>,
Expand All @@ -724,47 +750,92 @@ impl Args {
debuglog_flag: bool,
) -> CliResult<(LazyFrame, bool)> {
let schema_file = input_path.canonicalize()?.with_extension("pschema.json");
let mut create_schema = args.flag_cache_schema;
let mut create_schema = false;
let cache_schema = args.flag_cache_schema;

let mut reader =
create_lazy_reader(input_path.to_str().unwrap(), comment_char, args, delim);

if create_schema {
let mut valid_schema_exists = schema_file.exists()
&& schema_file.metadata()?.modified()? > input_path.metadata()?.modified()?;

if !valid_schema_exists {
let schema = create_schema_from_stats(input_path, args)?;
let stats_schema = Arc::new(schema);
let stats_schema_json = serde_json::to_string_pretty(&stats_schema)?;
match cache_schema {
0 => {
reader = reader.with_infer_schema_length(if args.flag_infer_len == 0 {
None
} else {
Some(args.flag_infer_len)
});
},
1 => {
let mut valid_schema_exists = schema_file.exists()
&& schema_file.metadata()?.modified()?
> input_path.metadata()?.modified()?;

if !valid_schema_exists {
let schema = create_schema_from_stats(input_path, args)?;
let stats_schema = Arc::new(schema);
let stats_schema_json = serde_json::to_string_pretty(&stats_schema)?;

let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(stats_schema_json.as_bytes())?;
file.flush()?;
if debuglog_flag {
log::debug!("Saved schema to file: {}", schema_file.display());
}
valid_schema_exists = true;
}

let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(stats_schema_json.as_bytes())?;
file.flush()?;
if debuglog_flag {
log::debug!("Saved schema to file: {}", schema_file.display());
if valid_schema_exists {
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
let mut schema_json = String::with_capacity(100);
buf_reader.read_to_string(&mut schema_json)?;
let schema: Schema = serde_json::from_str(&schema_json)?;
reader = reader.with_schema(Some(Arc::new(schema)));
create_schema = false;
} else {
reader = reader.with_infer_schema_length(Some(args.flag_infer_len));
create_schema = true;
}
valid_schema_exists = true;
}
},
-1 | -2 => {
// get the headers from the input file
let mut rdr = csv::Reader::from_path(input_path)?;
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

let mut schema = Schema::with_capacity(csv_fields.len());
for field in &csv_fields {
schema.insert(
PlSmallStr::from_str(simdutf8::basic::from_utf8(field).unwrap()),
polars::datatypes::DataType::String,
);
}
let allstring_schema = Arc::new(schema);

if valid_schema_exists {
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
let mut schema_json = String::with_capacity(100);
buf_reader.read_to_string(&mut schema_json)?;
let schema: Schema = serde_json::from_str(&schema_json)?;
reader = reader.with_schema(Some(Arc::new(schema)));
reader = reader.with_schema(Some(allstring_schema.clone()));
create_schema = false;
} else {
reader = reader.with_infer_schema_length(Some(args.flag_infer_len));
create_schema = true;
}
} else {
reader = reader.with_infer_schema_length(if args.flag_infer_len == 0 {
None
} else {
Some(args.flag_infer_len)
});

// create and cache allstring schema
if cache_schema == -2 {
let allstring_schema_json =
serde_json::to_string_pretty(&allstring_schema)?;

let mut file = BufWriter::new(File::create(&schema_file)?);
file.write_all(allstring_schema_json.as_bytes())?;
file.flush()?;
if debuglog_flag {
log::debug!(
"Saved allstring_schema to file: {}",
schema_file.display()
);
}
}
},
_ => {
return fail_incorrectusage_clierror!(
"Invalid --cache-schema value: {cache_schema}. Valid values are 0, 1, -1 \
and -2"
)
},
}

Ok((reader.finish()?, create_schema))
Expand Down
96 changes: 95 additions & 1 deletion tests/test_joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ macro_rules! joinp_test_cache_schema {
fn headers() {
let wrk = setup(stringify!($name));
let mut cmd = wrk.command("joinp");
cmd.args(&["city", "cities.csv", "city", "places.csv", "--cache-schema"]);
cmd.args(&[
"city",
"cities.csv",
"city",
"places.csv",
"--cache-schema",
"1",
]);
$fun(wrk, cmd);
}
}
Expand Down Expand Up @@ -1642,3 +1649,90 @@ fn joinp_filter_pattern_matching() {
];
assert_eq!(got, expected);
}

#[test]
fn test_joinp_cache_schema() {
let wrk = Workdir::new("joinp_cache_schema");

// Create test files based on issue #2369
wrk.create(
"left.csv",
vec![
svec!["id", "has_text", "col3", "col4", "col5"],
svec!["1", "1", "a", "b", "c"],
svec!["2", "0", "d", "e", "f"],
svec!["3", "1", "g", "h", "i"],
svec!["4", "0", "j", "k", "l"],
svec!["5", "1", "m", "n", "o"],
],
);

wrk.create(
"right.csv",
vec![
svec!["id", "has_text"],
svec!["1", "1"],
svec!["2", "0"],
svec!["4", "0"],
],
);

// Test 1: No schema caching (default)
let mut cmd = wrk.command("joinp");
cmd.args(&["has_text", "left.csv", "has_text", "right.csv"]);
wrk.assert_success(&mut cmd);

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);
let expected = vec![
svec!["id", "has_text", "col3", "col4", "col5", "id_right"],
svec!["1", "1", "a", "b", "c", "1"],
svec!["2", "0", "d", "e", "f", "2"],
svec!["2", "0", "d", "e", "f", "4"],
svec!["3", "1", "g", "h", "i", "1"],
svec!["4", "0", "j", "k", "l", "2"],
svec!["4", "0", "j", "k", "l", "4"],
svec!["5", "1", "m", "n", "o", "1"],
];
assert_eq!(got, expected);

// Test 2: Cache inferred schema
let mut cmd = wrk.command("joinp");
cmd.args(&["has_text", "left.csv", "has_text", "right.csv"])
.arg("--cache-schema")
.arg("1");

// error is expected as has_text is interpreted as bool, rather than a number of just a string
// recreates error reported in https://github.com/dathere/qsv/issues/2369
wrk.assert_err(&mut cmd);

// Verify schema files were created
assert!(wrk.path("left.pschema.json").exists());
assert!(wrk.path("right.pschema.json").exists());

// Test 3: Use string schema for all columns
let mut cmd = wrk.command("joinp");
cmd.args(&["has_text", "left.csv", "has_text", "right.csv"])
.arg("--cache-schema")
.arg("-1");
wrk.assert_success(&mut cmd);

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);
assert_eq!(got, expected);

// Test 4: Use and cache string schema
let mut cmd = wrk.command("joinp");
cmd.args(&["has_text", "left.csv", "has_text", "right.csv"])
.arg("--cache-schema")
.arg("-2");
wrk.assert_success(&mut cmd);

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);
assert_eq!(got, expected);

// Test 5: Invalid cache-schema value
let mut cmd = wrk.command("joinp");
cmd.args(&["has_text", "left.csv", "has_text", "right.csv"])
.arg("--cache-schema")
.arg("2");
wrk.assert_err(&mut cmd);
}

0 comments on commit 5964047

Please sign in to comment.