Skip to content

Commit

Permalink
Merge pull request #141 from samply/develop
Browse files Browse the repository at this point in the history
2.5 months worth of changes
  • Loading branch information
enola-dkfz authored May 17, 2024
2 parents a952adb + c316c50 commit 50ab353
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 181 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ laplace_rs = {version = "0.2.0", git = "https://github.com/samply/laplace-rs.git

# Logging
tracing = { version = "0.1.37", default_features = false }
tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "fmt"] }
tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] }

# Global variables
once_cell = "1.18"

# Command Line Interface
clap = { version = "4.0", default_features = false, features = ["std", "env", "derive", "help"] }
clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] }
rand = { default-features = false, version = "0.8.5" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }

[dev-dependencies]
tokio-test = "0.4.2"
Expand Down
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ FROM gcr.io/distroless/cc-debian12
ARG COMPONENT
COPY --from=chmodder /app/$COMPONENT /usr/local/bin/samply
ENTRYPOINT [ "/usr/local/bin/focus" ]

1 change: 1 addition & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ services:
BEAM_PROXY_URL: http://proxy1:8081
RETRY_COUNT: 30
OBFUSCATE: "no"
RUST_LOG: "debug,hyper=info"
blaze:
image: samply/blaze
volumes:
Expand Down
2 changes: 1 addition & 1 deletion dev/focusdev
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ function build() {
function build_docker() {
BACK2=$(pwd)
cd $SD
docker compose build --build-arg TARGETARCH=$ARCH
docker compose build --build-arg TARGETARCH=$ARCH --build-arg COMPONENT=focus
cd $BACK2
}

Expand Down
2 changes: 2 additions & 0 deletions resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
define function DiagnosisAge(condition FHIR.Condition):
condition.onset.value
2 changes: 2 additions & 0 deletions resources/cql/PRISM_STRAT_AGE_STRATIFIER_BBMRI
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
define AgeClass:
(AgeInYears())
152 changes: 101 additions & 51 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@ mod intermediate_rep;
mod task_processing;
mod util;

use base64::engine::general_purpose;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use beam_lib::{MsgId, TaskRequest, TaskResult};
use beam_lib::{TaskRequest, TaskResult};
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use laplace_rs::ObfCache;
use task_processing::TaskQueue;
use tokio::sync::Mutex;

use crate::blaze::parse_blaze_query;
use crate::config::EndpointType;
use crate::util::{is_cql_tampered_with, obfuscate_counts_mr};
use crate::{config::CONFIG, errors::FocusError};
use blaze::CqlQuery;

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::ops::DerefMut;
use std::process::ExitCode;
use std::str;
Expand Down Expand Up @@ -103,64 +107,110 @@ pub async fn main() -> ExitCode {
}

async fn main_loop() -> ExitCode {
// TODO: The report cache init should be an fn on the cache
let report_cache: ReportCache = ReportCache::new();

let mut seen_tasks = Default::default();
let mut task_queue = task_processing::spawn_task_workers(report_cache);
let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type {
EndpointType::Blaze => || blaze::check_availability().boxed(),
EndpointType::Omop => || async { true }.boxed(), // TODO health check
};
let mut failures = 0;
while failures < CONFIG.retry_count {
if failures > 0 {
warn!(
"Retrying connection (attempt {}/{})",
failures + 1,
while !(beam::check_availability().await && endpoint_service_available().await) {
failures += 1;
if failures >= CONFIG.retry_count {
error!(
"Encountered too many errors -- exiting after {} attempts.",
CONFIG.retry_count
);
tokio::time::sleep(Duration::from_secs(2)).await;
return ExitCode::from(22);
}
if !(beam::check_availability().await) {
failures += 1;
}
if CONFIG.endpoint_type == config::EndpointType::Blaze {
if !(blaze::check_availability().await) {
failures += 1;
}
} else if CONFIG.endpoint_type == config::EndpointType::Omop {
tokio::time::sleep(Duration::from_secs(2)).await;
warn!(
"Retrying connection (attempt {}/{})",
failures,
CONFIG.retry_count
);
};
let report_cache = Arc::new(Mutex::new(ReportCache::new()));
let obf_cache = Arc::new(Mutex::new(ObfCache {
cache: Default::default(),
}));
task_processing::process_tasks(move |task| {
let obf_cache = obf_cache.clone();
let report_cache = report_cache.clone();
process_task(task, obf_cache, report_cache).boxed_local()
}).await;
ExitCode::FAILURE
}

//TODO health check
}
async fn process_task(
task: &BeamTask,
obf_cache: Arc<Mutex<ObfCache>>,
report_cache: Arc<Mutex<ReportCache>>,
) -> Result<BeamResult, FocusError> {
debug!("Processing task {}", task.id);

if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await {
warn!("Encountered the following error, while processing tasks: {e}");
failures += 1;
} else {
failures = 0;
}
let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata {
project: "default_obfuscation".to_string(),
execute: true,
});

if metadata.project == "focus-healthcheck" {
return Ok(beam::beam_result::succeeded(
CONFIG.beam_app_id_long.clone(),
vec![task.from.clone()],
task.id,
"healthy".into()
));
}
error!(
"Encountered too many errors -- exiting after {} attempts.",
CONFIG.retry_count
);
ExitCode::from(22)
}

async fn process_tasks(
task_queue: &mut TaskQueue,
seen: &mut HashSet<MsgId>,
) -> Result<(), FocusError> {
debug!("Start processing tasks...");
let tasks = beam::retrieve_tasks().await?;
for task in tasks {
if seen.contains(&task.id) {
continue;
if metadata.project == "exporter" {
let body = &task.body;
return Ok(run_exporter_query(task, body, metadata.execute).await)?;
}

if CONFIG.endpoint_type == EndpointType::Blaze {
let query = parse_blaze_query(task)?;
if query.lang == "cql" {
// TODO: Change query.lang to an enum

Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)?
} else {
warn!("Can't run queries with language {} in Blaze", query.lang);
Ok(beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
vec![task.from.clone()],
task.id,
format!(
"Can't run queries with language {} and/or endpoint type {}",
query.lang, CONFIG.endpoint_type
),
))
}
seen.insert(task.id);
task_queue
.send(task)
.await
.expect("Receiver is never dropped");
} else if CONFIG.endpoint_type == EndpointType::Omop {
let decoded = util::base64_decode(&task.body)?;
let intermediate_rep_query: intermediate_rep::IntermediateRepQuery =
serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?;
//TODO check that the language is ast
let query_decoded = general_purpose::STANDARD
.decode(intermediate_rep_query.query)
.map_err(FocusError::DecodeError)?;
let ast: ast::Ast =
serde_json::from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?;

Ok(run_intermediate_rep_query(task, ast).await)?
} else {
warn!(
"Can't run queries with endpoint type {}",
CONFIG.endpoint_type
);
Ok(beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
vec![task.from.clone()],
task.id,
format!(
"Can't run queries with endpoint type {}",
CONFIG.endpoint_type
),
))
}
Ok(())
}

async fn run_cql_query(
Expand Down
Loading

0 comments on commit 50ab353

Please sign in to comment.