diff --git a/glados-audit/src/selection.rs b/glados-audit/src/selection.rs index aea5954e..c471d84e 100644 --- a/glados-audit/src/selection.rs +++ b/glados-audit/src/selection.rs @@ -14,7 +14,7 @@ use tracing::{debug, error, warn}; use entity::{ content::{self, Model}, - content_audit, + content_audit::{self, AuditResult}, }; /// Interval between audit selections for a particular strategy. @@ -55,7 +55,7 @@ impl SelectionStrategy { match self { SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await, SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await, - SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"), + SelectionStrategy::Failed => select_failed_content_for_audit(tx, conn).await, SelectionStrategy::OldestMissing => { warn!("Need to implement SelectionStrategy::OldestMissing") } @@ -105,6 +105,55 @@ async fn select_latest_content_for_audit( } } +/// Finds and sends audit tasks for [SelectionStrategy::Failed]. +/// +/// Strategy achieved by: +/// 1. Look in audit table, filter for Failures, then sort oldest-first +/// 2. Left join with the content table using find_with_related(). +async fn select_failed_content_for_audit( + tx: mpsc::Sender, + conn: DatabaseConnection, +) -> ! { + debug!("initializing audit process for 'failed' strategy"); + + let mut interval = interval(Duration::from_secs(AUDIT_SELECTION_PERIOD_SECONDS)); + loop { + interval.tick().await; + if tx.is_closed() { + error!("Channel is closed."); + panic!(); + } + let search_result: Vec<(content_audit::Model, Vec)> = + match content_audit::Entity::find() + .filter(content_audit::Column::Result.eq(AuditResult::Failure)) + .order_by_asc(content_audit::Column::CreatedAt) + .find_with_related(content::Entity) + .limit(KEYS_PER_PERIOD) + .all(&conn) + .await + { + Ok(result) => result, + Err(err) => { + error!(audit.strategy="failed", err=?err, "Could not make audit query"); + continue; + } + }; + // The search returns multiple possible content for each audit. + // Yet, we know there will be one content because audits only have one one content foreign key. + let content_key_db_entries: Vec = search_result + .into_iter() + .filter_map(|(_audit, contents)| contents.into_iter().next()) + .collect(); + + let item_count = content_key_db_entries.len(); + debug!( + strategy = "failed", + item_count, "Adding content keys to the audit queue." + ); + add_to_queue(tx.clone(), content_key_db_entries).await; + } +} + /// Adds Glados database History sub-protocol search results /// to a channel for auditing against a Portal Node. async fn add_to_queue(tx: mpsc::Sender, items: Vec) { @@ -324,4 +373,35 @@ mod tests { // Make sure no key was audited twice by pushing to a hashmap and checking it's length. assert_eq!(checked_ids.len(), KEYS_PER_PERIOD as usize); } + + /// Tests that the `SelectionStrategy::Failed` selects the correct values + /// from the test database. + #[tokio::test] + async fn test_failed_strategy() { + // Orchestration + let conn = get_populated_test_audit_db().await.unwrap(); + let (tx, mut rx) = channel::(100); + // Start strategy + tokio::spawn(select_failed_content_for_audit(tx.clone(), conn.clone())); + let mut checked_ids: HashSet = HashSet::new(); + // There are 7 correct values: [17, 19, ..., 27, 29] + let expected_key_ids: Vec = vec![17, 19, 21, 23, 25, 27, 29]; + // Await strategy results + while let Some(key) = rx.recv().await { + let key_model = content::Entity::find() + .filter(content::Column::ContentKey.eq(key.to_bytes())) + .one(&conn) + .await + .unwrap() + .unwrap(); + // Check that strategy only yields expected keys. + assert!(expected_key_ids.contains(&key_model.id)); + checked_ids.insert(key_model.id); + if checked_ids.len() == expected_key_ids.len() as usize { + break; + } + } + // Make sure no key was audited twice by pushing to a hashmap and checking it's length. + assert_eq!(checked_ids.len(), expected_key_ids.len() as usize); + } }