Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement failed strategy #77

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions glados-audit/src/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{debug, error, warn};

use entity::{
content::{self, Model},
content_audit,
content_audit::{self, AuditResult},
};

/// Interval between audit selections for a particular strategy.
Expand Down Expand Up @@ -55,7 +55,7 @@ impl SelectionStrategy {
match self {
SelectionStrategy::Latest => select_latest_content_for_audit(tx, conn).await,
SelectionStrategy::Random => select_random_content_for_audit(tx, conn).await,
SelectionStrategy::Failed => warn!("Need to implement SelectionStrategy::Failed"),
SelectionStrategy::Failed => select_failed_content_for_audit(tx, conn).await,
SelectionStrategy::OldestMissing => {
warn!("Need to implement SelectionStrategy::OldestMissing")
}
Expand Down Expand Up @@ -105,6 +105,55 @@ async fn select_latest_content_for_audit(
}
}

/// Finds and sends audit tasks for [SelectionStrategy::Failed].
///
/// Strategy achieved by:
/// 1. Look in audit table, filter for Failures, then sort oldest-first
/// 2. Left join with the content table using find_with_related().
async fn select_failed_content_for_audit(
tx: mpsc::Sender<HistoryContentKey>,
conn: DatabaseConnection,
) -> ! {
debug!("initializing audit process for 'failed' strategy");

let mut interval = interval(Duration::from_secs(AUDIT_SELECTION_PERIOD_SECONDS));
loop {
interval.tick().await;
if tx.is_closed() {
error!("Channel is closed.");
panic!();
}
let search_result: Vec<(content_audit::Model, Vec<content::Model>)> =
match content_audit::Entity::find()
.filter(content_audit::Column::Result.eq(AuditResult::Failure))
.order_by_asc(content_audit::Column::CreatedAt)
.find_with_related(content::Entity)
.limit(KEYS_PER_PERIOD)
.all(&conn)
.await
{
Ok(result) => result,
Err(err) => {
error!(audit.strategy="failed", err=?err, "Could not make audit query");
continue;
}
};
// The search returns multiple possible content for each audit.
// Yet, we know there will be one content because audits only have one one content foreign key.
let content_key_db_entries: Vec<content::Model> = search_result
.into_iter()
.filter_map(|(_audit, contents)| contents.into_iter().next())
.collect();

let item_count = content_key_db_entries.len();
debug!(
strategy = "failed",
item_count, "Adding content keys to the audit queue."
);
add_to_queue(tx.clone(), content_key_db_entries).await;
}
}

/// Adds Glados database History sub-protocol search results
/// to a channel for auditing against a Portal Node.
async fn add_to_queue(tx: mpsc::Sender<HistoryContentKey>, items: Vec<Model>) {
Expand Down Expand Up @@ -324,4 +373,35 @@ mod tests {
// Make sure no key was audited twice by pushing to a hashmap and checking it's length.
assert_eq!(checked_ids.len(), KEYS_PER_PERIOD as usize);
}

/// Tests that the `SelectionStrategy::Failed` selects the correct values
/// from the test database.
#[tokio::test]
async fn test_failed_strategy() {
// Orchestration
let conn = get_populated_test_audit_db().await.unwrap();
let (tx, mut rx) = channel::<HistoryContentKey>(100);
// Start strategy
tokio::spawn(select_failed_content_for_audit(tx.clone(), conn.clone()));
let mut checked_ids: HashSet<i32> = HashSet::new();
// There are 7 correct values: [17, 19, ..., 27, 29]
let expected_key_ids: Vec<i32> = vec![17, 19, 21, 23, 25, 27, 29];
// Await strategy results
while let Some(key) = rx.recv().await {
let key_model = content::Entity::find()
.filter(content::Column::ContentKey.eq(key.to_bytes()))
.one(&conn)
.await
.unwrap()
.unwrap();
// Check that strategy only yields expected keys.
assert!(expected_key_ids.contains(&key_model.id));
checked_ids.insert(key_model.id);
if checked_ids.len() == expected_key_ids.len() as usize {
break;
}
}
// Make sure no key was audited twice by pushing to a hashmap and checking it's length.
assert_eq!(checked_ids.len(), expected_key_ids.len() as usize);
}
}