diff --git a/src/cli.rs b/src/cli.rs index 2665a8d..3f8c0a4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,9 +5,8 @@ use crate::{ REQUIRED_DATATYPE_COLUMNS, SQL_PARAM, }; use ansi_term::Style; -use anyhow::Result; use clap::{ArgAction, Parser, Subcommand}; -use futures::{executor::block_on, TryStreamExt}; +use futures::TryStreamExt; use serde_json::{json, Value as SerdeValue}; use sqlx::{query as sqlx_query, Row}; use std::{collections::HashMap, io}; @@ -561,8 +560,876 @@ pub enum RenameSubcommands { }, } +/// TODO: Add docstring +pub async fn build_valve(source: &str, database: &str, verbose: bool, batch_mode: bool) -> Valve { + let mut valve = Valve::build(&source, &database).await.expect(BUILD_ERROR); + valve.set_verbose(verbose); + valve.set_interactive(!batch_mode); + valve +} + +/// TODO: Add docstring +pub async fn add_column(cli: &Cli, table: &Option, column: &Option, no_load: bool) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let json_row = read_json_row_for_table(&valve, "column"); + let column_json = extract_column_fields(&json_row, table, column); + valve + .add_column(table, column, &column_json, no_load) + .await + .expect("Error adding column"); +} + +/// TODO: Add docstring +pub async fn add_datatype(cli: &Cli, datatype: &Option) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let json_datatype = read_json_row_for_table(&valve, "datatype"); + let dt_fields = extract_datatype_fields(&valve, datatype, &json_datatype); + valve + .add_datatype(&dt_fields) + .await + .expect("Error adding datatype"); +} + +/// TODO: Add docstring +pub async fn add_message( + cli: &Cli, + table: &Option, + row: &Option, + column: &Option, +) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let json_message = read_json_row_for_table(&valve, "message"); + let (table, row, column, value, level, rule, message) = + extract_message_fields(table, row, column, &json_message); + let message_id = valve + .insert_message(&table, row, &column, &value, &level, &rule, &message) + .await + .expect("Error inserting message"); + println!("{message_id}"); +} + +/// TODO: Add docstring +pub async fn add_row(cli: &Cli, table: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let json_row = read_json_row_for_table(&valve, table); + let (_, row) = valve + .insert_row(table, &json_row) + .await + .expect("Error inserting row"); + if cli.verbose { + println!( + "{}", + json!(row + .to_rich_json() + .expect("Error converting row to rich JSON")) + ); + } +} + +/// TODO: Add docstring +pub async fn add_table( + cli: &Cli, + table: &str, + path: &str, + sample_size: &usize, + error_rate: &f32, + seed: &Option, + no_load: bool, +) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .add_table(table, path, sample_size, error_rate, seed, no_load) + .await + .expect("Error adding table"); +} + +/// TODO: Add docstring +pub async fn create_all(cli: &Cli) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + // We turn interactive mode off since this is an "all" operation: + valve.set_interactive(false); + valve + .truncate_all_tables() + .await + .expect("Error truncating tables"); + valve + .ensure_all_tables_created() + .await + .expect("Error ensuring that all tables are created"); +} + +/// TODO: Add docstring +pub async fn delete_datatype(cli: &Cli, datatype: &str) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .delete_datatype(datatype) + .await + .expect("Error deleting datatype"); +} + +/// TODO: Add docstring +pub async fn delete_column(cli: &Cli, table: &str, column: &str, no_load: bool) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .delete_column(table, column, no_load) + .await + .expect("Error deleting column"); +} + +/// TODO: Add docstring +pub async fn delete_messages_by_id_or_rule( + cli: &Cli, + message_id: &Option, + rule: &Option, +) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + if let Some(message_id) = message_id { + valve + .delete_message(*message_id) + .await + .expect("Error deleting message"); + } else { + match rule { + Some(rule) => valve + .delete_messages_like(rule) + .await + .expect("Error deleting messages"), + None => panic!( + "Either a MESSAGE_ID or a RULE (possibly with wildcards) \ + is required. To delete all messages use the option '--rule %'." + ), + } + } +} + +/// TODO: Add docstring +pub async fn delete_table(cli: &Cli, table: &str, no_drop: bool) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .delete_table(table, no_drop) + .await + .expect("Error deleting table"); +} + +/// TODO: Add docstring +pub async fn delete_rows(cli: &Cli, table: &str, rows: &Vec) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + for row in rows { + valve + .delete_row(table, row) + .await + .expect("Error deleting row"); + } +} + +/// TODO: Add docstring +pub async fn drop_all_tables(cli: &Cli) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .drop_all_tables() + .await + .expect("Error dropping all tables"); +} + +/// TODO: Add docstring +pub async fn drop_table(cli: &Cli, table: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .drop_tables(&vec![table]) + .await + .expect("Error dropping tables"); +} + +/// TODO: Add docstring +pub async fn print_ancestors(cli: &Cli, datatype: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + println!( + "{}", + valve + .get_datatype_ancestor_names(datatype) + .iter() + .map(|name| { + if name.contains(" ") { + format!("'{name}'") + } else { + name.to_string() + } + }) + .collect::>() + .join(" ") + ); +} + +/// TODO: Add docstring +pub async fn print_cell(cli: &Cli, table: &str, row: u32, column: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let cell = valve + .get_cell_from_db(table, &row, column) + .await + .expect("Error getting cell"); + println!( + "{}", + json!(cell + .to_rich_json() + .expect("Error converting cell to rich JSON")) + ); +} + +/// TODO: Add docstring +pub async fn print_column_config(cli: &Cli, table: &str, column: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let column_config = valve + .config + .table + .get(table) + .expect(&format!("Table '{table}' not found")) + .column + .get(column) + .expect(&format!("Column '{column}' not found")); + println!("{}", json!(column_config)); +} + +/// TODO: Add docstring +pub async fn print_constraints(cli: &Cli, table: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let mut table_constraints = HashMap::new(); + table_constraints.insert( + "primary", + json!(valve + .config + .constraint + .primary + .get(table) + .expect(&format!("No table '{table}'"))), + ); + table_constraints.insert( + "unique", + json!(valve + .config + .constraint + .unique + .get(table) + .expect(&format!("No table '{table}'"))), + ); + table_constraints.insert( + "foreign", + json!(valve + .config + .constraint + .foreign + .get(table) + .expect(&format!("No table '{table}'"))), + ); + table_constraints.insert( + "tree", + json!(valve + .config + .constraint + .tree + .get(table) + .expect(&format!("No table '{table}'"))), + ); + println!("{}", json!(table_constraints)); +} + +/// TODO: Add docstring +pub async fn print_datatype_config(cli: &Cli, datatype: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let dt_config = valve + .config + .datatype + .get(datatype) + .expect(&format!("Datatype '{datatype}' not found")); + println!("{}", json!(dt_config)); +} + +/// TODO: Add docstring +pub async fn print_messages( + cli: &Cli, + table: &Option, + row: &Option, + column: &Option, + rule: &Option, + message_id: &Option, +) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let mut sql = format!( + r#"SELECT "message_id", + "table", "row", "column", "value", "level", "rule", "message" + FROM "message""# + ); + let mut sql_params = vec![]; + match message_id { + Some(message_id) => { + sql.push_str(&format!(r#" WHERE "message_id" = {message_id}"#)); + } + None => { + if let Some(table) = table { + sql.push_str(&format!(r#"WHERE "table" = {SQL_PARAM}"#)); + sql_params.push(table); + } + // The command-line parser will ensure that TABLE has been given + // whenever ROW is given, and that TABLE and ROW have both been given + // whenever COLUMN is given. The case of RULE is different since it is + // a long parameter that is parsed independently. + if let Some(row) = row { + sql.push_str(&format!(r#" AND "row" = {row}"#)); + } + if let Some(column) = column { + sql.push_str(&format!(r#" AND "column" = {SQL_PARAM}"#)); + sql_params.push(column); + } + if let Some(rule) = rule { + sql.push_str(&format!( + r#" {connective} "rule" LIKE {SQL_PARAM}"#, + connective = match table { + None => "WHERE", + Some(_) => "AND", + } + )); + sql_params.push(rule); + } + } + }; + let sql = local_sql_syntax( + &valve.db_kind, + &format!(r#"{sql} ORDER BY "table", "row", "column", "message_id""#,), + ); + let mut query = sqlx_query(&sql); + for param in &sql_params { + query = query.bind(param); + } + + let mut row_stream = query.fetch(&valve.pool); + let mut is_first = true; + print!("["); + while let Some(row) = row_stream + .try_next() + .await + .expect("Error fetching row from stream") + { + if !is_first { + print!(","); + } else { + is_first = false; + } + let rn: i64 = row.get::("row"); + let rn = rn as u32; + let mid: i32 = row.get::("message_id"); + let mid = mid as u16; + println!( + "{{\"message_id\":{},\"table\":{},\"row\":{},\"column\":{},\ + \"value\":{},\"level\":{},\"rule\":{},\"message\":{}}}", + mid, + json!(row.get::<&str, _>("table")), + rn, + json!(row.get::<&str, _>("column")), + json!(row.get::<&str, _>("value")), + json!(row.get::<&str, _>("level")), + json!(row.get::<&str, _>("rule")), + json!(row.get::<&str, _>("message")), + ); + } + println!("]"); +} + +/// TODO: Add docstring +pub async fn print_row(cli: &Cli, table: &str, row: u32) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let row = valve + .get_row_from_db(table, &row) + .await + .expect("Error getting row"); + println!( + "{}", + json!(row + .to_rich_json() + .expect("Error converting row to rich JSON")) + ); +} + +/// TODO: Add docstring +pub async fn print_rules(cli: &Cli, table: &str, column: &Option) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + if !valve.config.table.contains_key(table) { + panic!("No table '{table}'"); + } + if let Some(table_rules) = valve.config.rule.get(table) { + match column { + Some(column) => { + if let Some(column_rules) = table_rules.get(column) { + println!("{}", json!(column_rules)); + } + } + None => println!("{}", json!(table_rules)), + }; + } +} + +/// TODO: Add docstring +pub async fn print_schema(cli: &Cli) { + let valve = build_valve(&cli.source, "", cli.verbose, cli.assume_yes).await; + let schema = valve.dump_schema().await.expect("Error dumping schema"); + println!("{}", schema); +} + +/// TODO: Add docstring +pub async fn print_special(cli: &Cli, table: &Option) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + match table { + None => { + println!("Table table name: '{}'", valve.config.special.table); + println!("Column table name: '{}'", valve.config.special.column); + println!("Datatype table name: '{}'", valve.config.special.datatype); + println!("Rule table name: '{}'", valve.config.special.rule); + } + Some(table) => { + let table = table.to_string(); + match table.as_str() { + "table" => println!("{}", valve.config.special.table), + "column" => println!("{}", valve.config.special.column), + "datatype" => println!("{}", valve.config.special.datatype), + "rule" => println!("{}", valve.config.special.rule), + _ => panic!("Not a special table type: '{table}'"), + }; + } + }; +} + +/// TODO: Add docstring +pub async fn print_table(cli: &Cli, table: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let (sql, sql_params) = + generic_select_with_message_values(table, &valve.config, &valve.db_kind); + let sql = local_sql_syntax(&valve.db_kind, &sql); + let mut query = sqlx_query(&sql); + for param in &sql_params { + query = query.bind(param); + } + + let mut row_stream = query.fetch(&valve.pool); + let mut is_first = true; + print!("["); + while let Some(row) = row_stream + .try_next() + .await + .expect("Error fetching row from stream") + { + if !is_first { + print!(","); + } else { + is_first = false; + } + let row = ValveRow::from_any_row(&valve.config, &valve.db_kind, table, &row, &None) + .expect("Error converting to ValveRow"); + println!( + "{}", + json!(row + .to_rich_json() + .expect("Error converting row to rich JSON")) + ); + } + println!("]"); +} + +/// TODO: Add docstring +pub async fn print_table_config(cli: &Cli, table: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let table_config = valve + .config + .table + .get(table) + .expect(&format!("{table} not found")); + println!("{}", json!(table_config)); +} + +/// TODO: Add docstring +pub async fn print_table_order(cli: &Cli) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let sorted_table_list = valve.get_sorted_table_list(); + println!("{}", sorted_table_list.join(", ")); +} + +/// TODO: Add docstring +pub async fn print_value(cli: &Cli, table: &str, row: u32, column: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let cell = valve + .get_cell_from_db(table, &row, column) + .await + .expect("Error getting cell"); + println!("{}", cell.strvalue()); +} + +/// TODO: Add docstring +pub async fn print_valve_config(cli: &Cli) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + println!("{}", valve.config) +} + +/// TODO: Add docstring +pub async fn print_history(cli: &Cli, context: usize) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let mut undo_history = valve + .get_changes_to_undo(context) + .await + .expect("Error getting changes to undo"); + let next_undo = match undo_history.len() { + 0 => 0, + _ => undo_history[0].history_id, + }; + undo_history.reverse(); + let id_width = next_undo.to_string().len(); + for undo in &undo_history { + if undo.history_id == next_undo { + let line = format!("▲ {:>id_width$} {}", undo.history_id, undo.message); + println!("{}", Style::new().bold().paint(line)); + } else { + println!(" {:>id_width$} {}", undo.history_id, undo.message); + } + } + + let redo_history = valve + .get_changes_to_redo(context) + .await + .expect("Error getting changes to redo"); + let next_redo = match redo_history.len() { + 0 => 0, + _ => redo_history[0].history_id, + }; + let mut highest_encountered_id = 0; + for redo in &redo_history { + if redo.history_id > highest_encountered_id { + highest_encountered_id = redo.history_id; + } + // We do not allow redoing changes that are older than the next record to undo. + // If there are no such changes in the redo stack, then there will be no triangle, + // which indicates that nothing can be redone even though there are entries in the + // redo stack. + if redo.history_id == next_redo && redo.history_id > next_undo { + println!("▼ {:>id_width$} {}", redo.history_id, redo.message); + } else { + let line = format!(" {:>id_width$} {}", redo.history_id, redo.message); + // If the history_id under consideration is lower than the next undo, or if + // there is a redo operation appearing before this one in the returned results + // that has a greater history_id, then this is an orphaned operation that cannot + // be redone. We choose not to include orphaned ops in the history output: + if redo.history_id >= next_undo && redo.history_id >= highest_encountered_id { + println!("{line}"); + } + } + } +} + +/// TODO: Add docstring +pub async fn load_all(cli: &Cli) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .configure_for_initial_load() + .await + .expect("Could not configure for initial load"); + valve + .load_all_tables(true) + .await + .expect("Error loading tables"); +} + +/// TODO: Add docstring +pub async fn load_table(cli: &Cli, table: &str, initial_load: bool) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + if initial_load { + if valve.db_kind == DbKind::Sqlite && !cli.assume_yes { + print!( + "--initial-load enables options intended for use on an empty database. \ + It should not normally be set when loading a single table as it is \ + unsafe and could result in data corruption in the case of an \ + interrupted transaction. Are you sure you want to continue? [y/N] " + ); + if !proceed::proceed() { + std::process::exit(1); + } + } + valve + .configure_for_initial_load() + .await + .expect("Could not configure for initial load"); + } + valve + .load_tables(&vec![table], true) + .await + .expect("Error loading table"); +} + +/// TODO: Add docstring +pub async fn move_row(cli: &Cli, table: &str, row: u32, after: u32) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .move_row(table, &row, &after) + .await + .expect("Error moving row"); +} + +/// TODO: Add docstring +pub async fn undo_or_redo(cli: &Cli) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let updated_row = match &cli.command { + Commands::Undo {} => valve.undo().await.expect("Error undoing"), + Commands::Redo {} => valve.redo().await.expect("Error redoing"), + _ => unreachable!(), + }; + if let Some(valve_row) = updated_row { + print!( + "{}", + json!(valve_row + .to_rich_json() + .expect("Error converting row to rich JSON")) + ); + } +} + +/// TODO: Add docstring +pub async fn rename_column( + cli: &Cli, + table: &str, + column: &str, + new_name: &str, + new_label: &Option, + no_load: bool, +) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .rename_column(table, column, new_name, new_label, no_load) + .await + .expect("Error renaming column"); +} + +/// TODO: Add docstring +pub async fn rename_datatype(cli: &Cli, datatype: &str, new_name: &str) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .rename_datatype(datatype, new_name) + .await + .expect("Error renaming datatype"); +} + +/// TODO: Add docstring +pub async fn rename_table(cli: &Cli, table: &str, new_name: &str) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .rename_table(table, new_name) + .await + .expect("Error renaming table"); +} + +/// TODO: Add docstring +pub async fn save(cli: &Cli, tables: &Option>, save_dir: &Option) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + match tables { + None => { + valve + .save_all_tables(&save_dir) + .await + .expect("Error saving tables"); + } + Some(tables) => { + let tables = tables + .iter() + .filter(|s| *s != "") + .map(|s| s.as_str()) + .collect::>(); + valve + .save_tables(&tables, &save_dir) + .await + .expect("Error saving tables"); + } + }; +} + +/// TODO: Add docstring +pub async fn save_as(cli: &Cli, table: &str, path: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .save_table(table, path) + .await + .expect("Error saving table"); +} + +/// TODO: Add docstring +pub async fn test_api(cli: &Cli) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + run_api_tests(&valve) + .await + .expect("Error running API tests"); +} + +/// TODO: Add docstring +pub async fn test_dt_hierarchy(cli: &Cli) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + run_dt_hierarchy_tests(&valve).expect("Error running datatype hierarchy tests"); +} + +/// TODO: Add docstring +pub async fn truncate_all_tables(cli: &Cli) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .truncate_all_tables() + .await + .expect("Error truncating tables"); +} + +/// TODO: Add docstring +pub async fn truncate_table(cli: &Cli, table: &str) { + let mut valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + valve + .truncate_tables(&vec![table]) + .await + .expect("Error truncating table"); +} + +/// TODO: Add docstring +pub async fn update_message( + cli: &Cli, + message_id: u16, + table: &Option, + row: &Option, + column: &Option, +) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let json_message = read_json_row_for_table(&valve, "message"); + let (table, row, column, value, level, rule, message) = + extract_message_fields(table, row, column, &json_message); + valve + .update_message( + message_id, &table, row, &column, &value, &level, &rule, &message, + ) + .await + .expect("Error updating message"); +} + +/// TODO: Add docstring +pub async fn update_row(cli: &Cli, table: &str, row: &Option) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let mut input_row = read_json_row_for_table(&valve, table); + let input_rn = extract_rn(&mut input_row); + let rn = match input_rn { + Some(input_rn) => match row { + Some(rn) if *rn != input_rn => { + panic!("Mismatch between input row and positional parameter, ROW") + } + None | Some(_) => input_rn, + }, + None => match row { + Some(row) => *row, + None => panic!("No row given"), + }, + }; + let output_row = valve + .update_row(table, &rn, &input_row) + .await + .expect("Error updating row"); + // Print the results to STDOUT: + println!( + "{}", + json!(output_row + .to_rich_json() + .expect("Error converting updated row to rich JSON")) + ); +} + +/// TODO: Add docstring +pub async fn update_value(cli: &Cli, table: &str, row: u32, column: &str, value: &str) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + + let json_row = fetch_row_with_input_value(&valve, table, row, column, value).await; + let output_row = valve + .update_row(table, &row, &json_row) + .await + .expect("Error updating row"); + // Print the results to STDOUT: + println!( + "{}", + json!(output_row + .to_rich_json() + .expect("Error converting updated row to rich JSON")) + ); +} + +/// TODO: Add docstring +pub async fn validate( + cli: &Cli, + table: &str, + row: &Option, + column: &Option, + value: &Option, +) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; + let (rn, input_row) = match value { + Some(value) => { + let rn = row.expect("No row given"); + let input_row = fetch_row_with_input_value( + &valve, + table, + rn, + match column { + Some(column) => column, + None => panic!("No column given"), + }, + value, + ) + .await; + (Some(rn), input_row) + } + None => { + let mut input_row = read_json_row_for_table(&valve, table); + let rn = extract_rn(&mut input_row); + // If now row was input, default to `row` (which could still be None) + let rn = match rn { + Some(rn) => { + if let Some(row) = row { + if *row != rn { + panic!("Mismatch between input row and positional parameter, ROW") + } + } + Some(rn) + } + None => *row, + }; + (rn, input_row) + } + }; + // Validate the input row: + let output_row = valve + .validate_row(table, &input_row, rn) + .await + .expect("Error validating row"); + + // Print the results to STDOUT: + println!( + "{}", + json!(output_row + .to_rich_json() + .expect("Error converting validated row to rich JSON")) + ); + + // Set the exit status: + let exit_code = output_row.contents.iter().all(|(_, vcell)| vcell.valid); + std::process::exit(match exit_code { + true => 0, + false => 1, + }); +} + /// Prints the table dependencies in either incoming or outgoing order. -pub fn print_dependencies(valve: &Valve, incoming: bool) { +pub async fn print_dependencies(cli: &Cli, incoming: bool) { + let valve = build_valve(&cli.source, &cli.database, cli.verbose, cli.assume_yes).await; let dependencies = valve .collect_dependencies(incoming) .expect("Could not collect dependencies"); @@ -896,7 +1763,7 @@ pub fn extract_datatype_fields( /// Process Valve commands and command-line options. Note that this function will panic if it /// encouters an error. -pub async fn process_command() -> Result<()> { +pub async fn process_command() { let cli = Cli::parse(); // Although Valve::build() will accept a non-TSV argument (in which case that argument is // ignored and a table called 'table' is looked up in the given database instead), we do not @@ -906,59 +1773,19 @@ pub async fn process_command() -> Result<()> { std::process::exit(1); } - // This has to be done multiple times so we declare a closure. We use a closure instead of a - // function so that the cli.verbose and cli.assume_yes fields are in scope: - let build_valve = |source: &str, database: &str| -> Result { - let mut valve = block_on(Valve::build(&source, &database)).expect(BUILD_ERROR); - valve.set_verbose(cli.verbose); - valve.set_interactive(!cli.assume_yes); - Ok(valve) - }; - match &cli.command { Commands::Add { subcommand } => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); match subcommand { AddSubcommands::Column { table, column, no_load, - } => { - let json_row = read_json_row_for_table(&valve, "column"); - let column_json = extract_column_fields(&json_row, table, column); - valve - .add_column(table, column, &column_json, *no_load) - .await?; - } - AddSubcommands::Datatype { datatype } => { - let json_datatype = read_json_row_for_table(&valve, "datatype"); - let dt_fields = extract_datatype_fields(&valve, datatype, &json_datatype); - valve.add_datatype(&dt_fields).await?; - } + } => add_column(&cli, table, column, *no_load).await, + AddSubcommands::Datatype { datatype } => add_datatype(&cli, datatype).await, AddSubcommands::Message { table, row, column } => { - let json_message = read_json_row_for_table(&valve, "message"); - let (table, row, column, value, level, rule, message) = - extract_message_fields(table, row, column, &json_message); - let message_id = valve - .insert_message(&table, row, &column, &value, &level, &rule, &message) - .await?; - println!("{message_id}"); - } - AddSubcommands::Row { table } => { - let json_row = read_json_row_for_table(&valve, table); - let (_, row) = valve - .insert_row(table, &json_row) - .await - .expect("Error inserting row"); - if cli.verbose { - println!( - "{}", - json!(row - .to_rich_json() - .expect("Error converting row to rich JSON")) - ); - } + add_message(&cli, table, row, column).await } + AddSubcommands::Row { table } => add_row(&cli, table).await, AddSubcommands::Table { table, path, @@ -966,490 +1793,75 @@ pub async fn process_command() -> Result<()> { error_rate, seed, no_load, - } => { - valve - .add_table(table, path, sample_size, error_rate, seed, *no_load) - .await?; - } + } => add_table(&cli, table, path, sample_size, error_rate, seed, *no_load).await, }; } - Commands::CreateAll {} => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - // We turn interactive mode off since this is an "all" operation: - valve.set_interactive(false); - valve - .truncate_all_tables() - .await - .expect("Error truncating tables"); - valve - .ensure_all_tables_created() - .await - .expect("Error ensuring all tables created"); - } + Commands::CreateAll {} => create_all(&cli).await, Commands::Delete { subcommand } => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); match subcommand { DeleteSubcommands::Column { table, column, no_load, - } => { - valve - .delete_column(table, column, *no_load) - .await - .expect("Error deleting column"); - } - DeleteSubcommands::Datatype { datatype } => { - valve - .delete_datatype(datatype) - .await - .expect("Error deleting datatype"); - } + } => delete_column(&cli, table, column, *no_load).await, + DeleteSubcommands::Datatype { datatype } => delete_datatype(&cli, datatype).await, DeleteSubcommands::Messages { message_id, rule } => { - if let Some(message_id) = message_id { - valve - .delete_message(*message_id) - .await - .expect("Could not delete message"); - } else { - match rule { - Some(rule) => valve - .delete_messages_like(rule) - .await - .expect("Could not delete message"), - None => panic!( - "Either a MESSAGE_ID or a RULE (possibly with wildcards) \ - is required. To delete all messages use the option '--rule %'." - ), - } - } - } - DeleteSubcommands::Row { table, rows } => { - for row in rows { - valve - .delete_row(table, row) - .await - .expect("Could not delete row"); - } + delete_messages_by_id_or_rule(&cli, message_id, rule).await } + DeleteSubcommands::Row { table, rows } => delete_rows(&cli, table, rows).await, DeleteSubcommands::Table { table, no_drop } => { - valve - .delete_table(table, *no_drop) - .await - .expect("Error deleting table"); + delete_table(&cli, table, *no_drop).await } }; } - Commands::DropAll {} => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve - .drop_all_tables() - .await - .expect("Error dropping tables"); - } - Commands::Drop { table } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve - .drop_tables(&vec![table.as_str()]) - .await - .expect("Error dropping table"); - } + Commands::DropAll {} => drop_all_tables(&cli).await, + Commands::Drop { table } => drop_table(&cli, table).await, Commands::Get { subcommand } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); match subcommand { - GetSubcommands::Ancestors { datatype } => { - println!( - "{}", - valve - .get_datatype_ancestor_names(datatype) - .iter() - .map(|name| { - if name.contains(" ") { - format!("'{name}'") - } else { - name.to_string() - } - }) - .collect::>() - .join(" ") - ); - } + GetSubcommands::Ancestors { datatype } => print_ancestors(&cli, datatype).await, GetSubcommands::Cell { table, row, column } => { - let cell = valve - .get_cell_from_db(table, row, column) - .await - .expect("Error getting cell"); - println!( - "{}", - json!(cell - .to_rich_json() - .expect("Error converting cell to rich JSON")) - ); + print_cell(&cli, table, *row, column).await } GetSubcommands::ColumnConfig { table, column } => { - let column_config = valve - .config - .table - .get(table) - .expect(&format!("Table '{table}' not found")) - .column - .get(column) - .expect(&format!("Column '{column}' not found")); - println!("{}", json!(column_config)); - } - GetSubcommands::Constraints { table } => { - let mut table_constraints = HashMap::new(); - table_constraints.insert( - "primary", - json!(valve - .config - .constraint - .primary - .get(table) - .expect(&format!("No table '{table}'"))), - ); - table_constraints.insert( - "unique", - json!(valve - .config - .constraint - .unique - .get(table) - .expect(&format!("No table '{table}'"))), - ); - table_constraints.insert( - "foreign", - json!(valve - .config - .constraint - .foreign - .get(table) - .expect(&format!("No table '{table}'"))), - ); - table_constraints.insert( - "tree", - json!(valve - .config - .constraint - .tree - .get(table) - .expect(&format!("No table '{table}'"))), - ); - println!("{}", json!(table_constraints)); + print_column_config(&cli, table, column).await } + GetSubcommands::Constraints { table } => print_constraints(&cli, table).await, GetSubcommands::DatatypeConfig { datatype } => { - let dt_config = valve - .config - .datatype - .get(datatype) - .expect(&format!("Datatype '{datatype}' not found")); - println!("{}", json!(dt_config)); - } - GetSubcommands::IncomingDeps {} => { - print_dependencies(&valve, true); + print_datatype_config(&cli, datatype).await } + GetSubcommands::IncomingDeps {} => print_dependencies(&cli, true).await, GetSubcommands::Messages { table, row, column, rule, message_id, - } => { - let mut sql = format!( - r#"SELECT "message_id", - "table", "row", "column", "value", "level", "rule", "message" - FROM "message""# - ); - let mut sql_params = vec![]; - match message_id { - Some(message_id) => { - sql.push_str(&format!(r#" WHERE "message_id" = {message_id}"#)); - } - None => { - if let Some(table) = table { - sql.push_str(&format!(r#"WHERE "table" = {SQL_PARAM}"#)); - sql_params.push(table); - } - // The command-line parser will ensure that TABLE has been given - // whenever ROW is given, and that TABLE and ROW have both been given - // whenever COLUMN is given. The case of RULE is different since it is - // a long parameter that is parsed independently. - if let Some(row) = row { - sql.push_str(&format!(r#" AND "row" = {row}"#)); - } - if let Some(column) = column { - sql.push_str(&format!(r#" AND "column" = {SQL_PARAM}"#)); - sql_params.push(column); - } - if let Some(rule) = rule { - sql.push_str(&format!( - r#" {connective} "rule" LIKE {SQL_PARAM}"#, - connective = match table { - None => "WHERE", - Some(_) => "AND", - } - )); - sql_params.push(rule); - } - } - }; - let sql = local_sql_syntax( - &valve.db_kind, - &format!(r#"{sql} ORDER BY "table", "row", "column", "message_id""#,), - ); - let mut query = sqlx_query(&sql); - for param in &sql_params { - query = query.bind(param); - } - - let mut row_stream = query.fetch(&valve.pool); - let mut is_first = true; - print!("["); - while let Some(row) = row_stream.try_next().await? { - if !is_first { - print!(","); - } else { - is_first = false; - } - let rn: i64 = row.get::("row"); - let rn = rn as u32; - let mid: i32 = row.get::("message_id"); - let mid = mid as u16; - println!( - "{{\"message_id\":{},\"table\":{},\"row\":{},\"column\":{},\ - \"value\":{},\"level\":{},\"rule\":{},\"message\":{}}}", - mid, - json!(row.get::<&str, _>("table")), - rn, - json!(row.get::<&str, _>("column")), - json!(row.get::<&str, _>("value")), - json!(row.get::<&str, _>("level")), - json!(row.get::<&str, _>("rule")), - json!(row.get::<&str, _>("message")), - ); - } - println!("]"); - } - GetSubcommands::OutgoingDeps {} => { - print_dependencies(&valve, false); - } - GetSubcommands::Row { table, row } => { - let row = valve - .get_row_from_db(table, row) - .await - .expect("Error getting row"); - println!( - "{}", - json!(row - .to_rich_json() - .expect("Error converting row to rich JSON")) - ); - } - GetSubcommands::Rules { table, column } => { - if !valve.config.table.contains_key(table) { - panic!("No table '{table}'"); - } - if let Some(table_rules) = valve.config.rule.get(table) { - match column { - Some(column) => { - if let Some(column_rules) = table_rules.get(column) { - println!("{}", json!(column_rules)); - } - } - None => println!("{}", json!(table_rules)), - }; - } - } - GetSubcommands::Schema {} => { - let valve = build_valve(&cli.source, "").expect(BUILD_ERROR); - let schema = valve.dump_schema().await.expect("Error dumping schema"); - println!("{}", schema); - } - GetSubcommands::Special { table } => { - match table { - None => { - println!("Table table name: '{}'", valve.config.special.table); - println!("Column table name: '{}'", valve.config.special.column); - println!("Datatype table name: '{}'", valve.config.special.datatype); - println!("Rule table name: '{}'", valve.config.special.rule); - } - Some(table) => { - let table = table.to_string(); - match table.as_str() { - "table" => println!("{}", valve.config.special.table), - "column" => println!("{}", valve.config.special.column), - "datatype" => println!("{}", valve.config.special.datatype), - "rule" => println!("{}", valve.config.special.rule), - _ => panic!("Not a special table type: '{table}'"), - }; - } - }; - } - GetSubcommands::Table { table } => { - let (sql, sql_params) = - generic_select_with_message_values(table, &valve.config, &valve.db_kind); - let sql = local_sql_syntax(&valve.db_kind, &sql); - let mut query = sqlx_query(&sql); - for param in &sql_params { - query = query.bind(param); - } - - let mut row_stream = query.fetch(&valve.pool); - let mut is_first = true; - print!("["); - while let Some(row) = row_stream.try_next().await? { - if !is_first { - print!(","); - } else { - is_first = false; - } - let row = ValveRow::from_any_row( - &valve.config, - &valve.db_kind, - table, - &row, - &None, - ) - .expect("Error converting to ValveRow"); - println!( - "{}", - json!(row - .to_rich_json() - .expect("Error converting row to rich JSON")) - ); - } - println!("]"); - } - GetSubcommands::TableConfig { table } => { - let table_config = valve - .config - .table - .get(table) - .expect(&format!("{table} not found")); - println!("{}", json!(table_config)); - } - GetSubcommands::TableOrder {} => { - let sorted_table_list = valve.get_sorted_table_list(); - println!("{}", sorted_table_list.join(", ")); - } + } => print_messages(&cli, table, row, column, rule, message_id).await, + GetSubcommands::OutgoingDeps {} => print_dependencies(&cli, false).await, + GetSubcommands::Row { table, row } => print_row(&cli, table, *row).await, + GetSubcommands::Rules { table, column } => print_rules(&cli, table, column).await, + GetSubcommands::Schema {} => print_schema(&cli).await, + GetSubcommands::Special { table } => print_special(&cli, table).await, + GetSubcommands::Table { table } => print_table(&cli, table).await, + GetSubcommands::TableConfig { table } => print_table_config(&cli, table).await, + GetSubcommands::TableOrder {} => print_table_order(&cli).await, GetSubcommands::Value { table, row, column } => { - let cell = valve - .get_cell_from_db(table, row, column) - .await - .expect("Error getting cell"); - println!("{}", cell.strvalue()); - } - GetSubcommands::ValveConfig {} => { - println!("{}", valve.config) + print_value(&cli, table, *row, column).await } + GetSubcommands::ValveConfig {} => print_valve_config(&cli).await, }; } - Commands::History { context } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - let mut undo_history = valve.get_changes_to_undo(*context).await?; - let next_undo = match undo_history.len() { - 0 => 0, - _ => undo_history[0].history_id, - }; - undo_history.reverse(); - let id_width = next_undo.to_string().len(); - for undo in &undo_history { - if undo.history_id == next_undo { - let line = format!("▲ {:>id_width$} {}", undo.history_id, undo.message); - println!("{}", Style::new().bold().paint(line)); - } else { - println!(" {:>id_width$} {}", undo.history_id, undo.message); - } - } - - let redo_history = valve.get_changes_to_redo(*context).await?; - let next_redo = match redo_history.len() { - 0 => 0, - _ => redo_history[0].history_id, - }; - let mut highest_encountered_id = 0; - for redo in &redo_history { - if redo.history_id > highest_encountered_id { - highest_encountered_id = redo.history_id; - } - // We do not allow redoing changes that are older than the next record to undo. - // If there are no such changes in the redo stack, then there will be no triangle, - // which indicates that nothing can be redone even though there are entries in the - // redo stack. - if redo.history_id == next_redo && redo.history_id > next_undo { - println!("▼ {:>id_width$} {}", redo.history_id, redo.message); - } else { - let line = format!(" {:>id_width$} {}", redo.history_id, redo.message); - // If the history_id under consideration is lower than the next undo, or if - // there is a redo operation appearing before this one in the returned results - // that has a greater history_id, then this is an orphaned operation that cannot - // be redone. We choose not to include orphaned ops in the history output: - if redo.history_id >= next_undo && redo.history_id >= highest_encountered_id { - println!("{line}"); - } - } - } - } - Commands::LoadAll {} => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve - .configure_for_initial_load() - .await - .expect("Could not configure for initial load"); - valve - .load_all_tables(true) - .await - .expect("Error loading tables"); - } + Commands::History { context } => print_history(&cli, *context).await, + Commands::LoadAll {} => load_all(&cli).await, Commands::Load { initial_load, table, - } => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - if *initial_load { - if valve.db_kind == DbKind::Sqlite && !cli.assume_yes { - print!( - "--initial-load enables options intended for use on an empty database. \ - It should not normally be set when loading a single table as it is \ - unsafe and could result in data corruption in the case of an \ - interrupted transaction. Are you sure you want to continue? [y/N] " - ); - if !proceed::proceed() { - std::process::exit(1); - } - } - valve - .configure_for_initial_load() - .await - .expect("Could not configure for initial load"); - } - valve - .load_tables(&vec![table.as_str()], true) - .await - .expect("Error loading table"); - } - Commands::Move { table, row, after } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve.move_row(table, row, after).await?; - } + } => load_table(&cli, table, *initial_load).await, + Commands::Move { table, row, after } => move_row(&cli, table, *row, *after).await, Commands::Redo {} | Commands::Undo {} => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - let updated_row = match &cli.command { - Commands::Undo {} => valve.undo().await?, - Commands::Redo {} => valve.redo().await?, - _ => unreachable!(), - }; - if let Some(valve_row) = updated_row { - print!( - "{}", - json!(valve_row - .to_rich_json() - .expect("Error converting row to rich JSON")) - ); - } + undo_or_redo(&cli).await; } Commands::Rename { subcommand } => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); match subcommand { RenameSubcommands::Column { table, @@ -1457,144 +1869,55 @@ pub async fn process_command() -> Result<()> { new_name, new_label, no_load, - } => { - valve - .rename_column(table, column, new_name, new_label, *no_load) - .await - .expect("Error renaming column"); - } + } => rename_column(&cli, table, column, new_name, new_label, *no_load).await, RenameSubcommands::Datatype { datatype, new_name } => { - valve - .rename_datatype(datatype, new_name) - .await - .expect("Error renaming datatype"); + rename_datatype(&cli, datatype, new_name).await } RenameSubcommands::Table { table, new_name } => { - valve - .rename_table(table, new_name) - .await - .expect("Error renaming table"); + rename_table(&cli, table, new_name).await } }; } Commands::Save { save_dir, tables } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - match tables { - None => { - valve - .save_all_tables(&save_dir) - .await - .expect("Error saving tables"); - } - Some(tables) => { - let tables = tables - .iter() - .filter(|s| *s != "") - .map(|s| s.as_str()) - .collect::>(); - valve - .save_tables(&tables, &save_dir) - .await - .expect("Error saving tables"); - } - }; + save(&cli, tables, save_dir).await; } Commands::SaveAs { table, path } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve.save_table(table, path).await?; + save_as(&cli, table, path).await; } Commands::TestApi {} => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - run_api_tests(&valve) - .await - .expect("Error running API tests"); + test_api(&cli).await; } Commands::TestDtHierarchy {} => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - run_dt_hierarchy_tests(&valve).expect("Error running datatype hierarchy tests"); + test_dt_hierarchy(&cli).await; } Commands::TruncateAll {} => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve - .truncate_all_tables() - .await - .expect("Error truncating tables"); + truncate_all_tables(&cli).await; } Commands::Truncate { table } => { - let mut valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - valve - .truncate_tables(&vec![table.as_str()]) - .await - .expect("Error truncating table"); + truncate_table(&cli, table).await; } Commands::Update { subcommand } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - if let UpdateSubcommands::Message { - message_id, - table, - row, - column, - } = subcommand - { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - let json_message = read_json_row_for_table(&valve, "message"); - let (table, row, column, value, level, rule, message) = - extract_message_fields(table, row, column, &json_message); - valve - .update_message( - *message_id, - &table, - row, - &column, - &value, - &level, - &rule, - &message, - ) - .await?; - } else { - let (table, row_number, input_row) = match subcommand { - UpdateSubcommands::Row { table, row } => { - let mut json_row = read_json_row_for_table(&valve, table); - let input_rn = extract_rn(&mut json_row); - let row = match input_rn { - Some(input_rn) => match row { - Some(row) if *row != input_rn => panic!( - "Mismatch between input row and positional parameter, ROW" - ), - None | Some(_) => input_rn, - }, - None => match row { - Some(row) => *row, - None => panic!("No row given"), - }, - }; - (table, Some(row), json_row) - } - UpdateSubcommands::Value { - table, - row, - column, - value, - } => { - let json_row = - fetch_row_with_input_value(&valve, table, *row, column, value).await; - (table, Some(*row), json_row) - } - UpdateSubcommands::Message { .. } => unreachable!(), - }; - let output_row = match row_number { - None => panic!("A row number must be specified."), - Some(rn) => valve.update_row(table, &rn, &input_row).await?, - }; - // Print the results to STDOUT: - println!( - "{}", - json!(output_row - .to_rich_json() - .expect("Error converting updated row to rich JSON")) - ); - } + match subcommand { + UpdateSubcommands::Message { + message_id, + table, + row, + column, + } => { + update_message(&cli, *message_id, table, row, column).await; + } + UpdateSubcommands::Row { table, row } => { + update_row(&cli, table, row).await; + } + UpdateSubcommands::Value { + table, + row, + column, + value, + } => { + update_value(&cli, table, *row, column, value).await; + } + }; } Commands::Validate { table, @@ -1602,62 +1925,7 @@ pub async fn process_command() -> Result<()> { column, value, } => { - let valve = build_valve(&cli.source, &cli.database).expect(BUILD_ERROR); - let (rn, input_row) = match value { - Some(value) => { - let rn = row.expect("No row given"); - let input_row = fetch_row_with_input_value( - &valve, - table, - rn, - match column { - Some(column) => column, - None => panic!("No column given"), - }, - value, - ) - .await; - (Some(rn), input_row) - } - None => { - let mut input_row = read_json_row_for_table(&valve, table); - let rn = extract_rn(&mut input_row); - // If now row was input, default to `row` (which could still be None) - let rn = match rn { - Some(rn) => { - if let Some(row) = row { - if *row != rn { - panic!( - "Mismatch between input row and positional parameter, ROW" - ) - } - } - Some(rn) - } - None => *row, - }; - (rn, input_row) - } - }; - // Validate the input row: - let output_row = valve.validate_row(table, &input_row, rn).await?; - - // Print the results to STDOUT: - println!( - "{}", - json!(output_row - .to_rich_json() - .expect("Error converting validated row to rich JSON")) - ); - - // Set the exit status: - let exit_code = output_row.contents.iter().all(|(_, vcell)| vcell.valid); - std::process::exit(match exit_code { - true => 0, - false => 1, - }); + validate(&cli, table, row, column, value).await; } } - - Ok(()) } diff --git a/src/main.rs b/src/main.rs index 56b91ea..b58166e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,5 +3,6 @@ use ontodev_valve::cli; #[async_std::main] async fn main() -> Result<()> { - cli::process_command().await + cli::process_command().await; + Ok(()) }