Skip to content

Commit

Permalink
Merge pull request #84 from samply/develop
Browse files Browse the repository at this point in the history
clippy style, CQL string literal
  • Loading branch information
enola-dkfz authored Dec 6, 2023
2 parents cc693c9 + 03c7cf0 commit 5113864
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 65 deletions.
2 changes: 1 addition & 1 deletion dev/focusdev
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ function build() {
function build_docker() {
BACK2=$(pwd)
cd $SD
docker-compose build --build-arg TARGETARCH=$ARCH
docker compose build --build-arg TARGETARCH=$ARCH
cd $BACK2
}

Expand Down
2 changes: 1 addition & 1 deletion src/beam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString};
use once_cell::sync::Lazy;
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use tracing::{debug, warn};

use crate::{config::CONFIG, errors::FocusError};
Expand Down
13 changes: 6 additions & 7 deletions src/blaze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn post_library(library: String) -> Result<(), FocusError> {
.body(library)
.send()
.await
.map_err(|e| FocusError::UnableToPostLibrary(e))?;
.map_err(FocusError::UnableToPostLibrary)?;

if resp.status() == StatusCode::CREATED {
debug!("Successfully created a Library");
Expand All @@ -66,7 +66,7 @@ pub async fn post_measure(measure: String) -> Result<(), FocusError> {
.body(measure)
.send()
.await
.map_err(|e| FocusError::UnableToPostMeasure(e))?;
.map_err(FocusError::UnableToPostMeasure)?;

if resp.status() == StatusCode::CREATED {
debug!("Successfully created a Measure");
Expand All @@ -88,7 +88,7 @@ pub async fn evaluate_measure(url: String) -> Result<String, FocusError> {
))
.send()
.await
.map_err(|e| FocusError::MeasureEvaluationErrorReqwest(e))?;
.map_err(FocusError::MeasureEvaluationErrorReqwest)?;

if resp.status() == StatusCode::OK {
debug!(
Expand All @@ -98,7 +98,7 @@ pub async fn evaluate_measure(url: String) -> Result<String, FocusError> {
resp
.text()
.await
.map_err(|e| FocusError::MeasureEvaluationErrorReqwest(e))
.map_err( FocusError::MeasureEvaluationErrorReqwest)
} else {
warn!(
"Error while evaluating the Measure with canonical URL `{}`: {:?}",
Expand All @@ -110,14 +110,13 @@ pub async fn evaluate_measure(url: String) -> Result<String, FocusError> {

pub async fn run_cql_query(library: &Value, measure: &Value) -> Result<String, FocusError> {
let url: String = if let Ok(value) = get_json_field(&measure.to_string(), "url") {
value.to_string().replace("\"", "")
value.to_string().replace('"', "")
} else {
return Err(FocusError::CQLQueryError());
};
debug!("Evaluating the Measure with canonical URL: {}", url);

post_library(library.to_string()).await?; //TODO make it with into or could change the function signature to take the library
post_measure(measure.to_string()).await?; //ditto &str
let result_evaluation = evaluate_measure(url).await;
return result_evaluation;
evaluate_measure(url).await
}
8 changes: 4 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Config {
delta_medication_statements: cli_args.delta_medication_statements,
epsilon: cli_args.epsilon,
rounding_step: cli_args.rounding_step,
unobfuscated: cli_args.projects_no_obfuscation.split(";").map(|s| s.to_string()).collect(),
unobfuscated: cli_args.projects_no_obfuscation.split(';').map(|s| s.to_string()).collect(),
queries_to_cache_file_path: cli_args.queries_to_cache_file_path,
tls_ca_certificates,
client,
Expand Down Expand Up @@ -206,17 +206,17 @@ pub fn prepare_reqwest_client(certs: &Vec<Certificate>) -> Result<reqwest::Clien
match k.as_str() {
"http_proxy" => proxies.push(
Proxy::http(v)
.map_err(|e| FocusError::InvalidProxyConfig(e))?
.map_err(FocusError::InvalidProxyConfig)?
.no_proxy(no_proxy.clone()),
),
"https_proxy" => proxies.push(
Proxy::https(v)
.map_err(|e| FocusError::InvalidProxyConfig(e))?
.map_err(FocusError::InvalidProxyConfig)?
.no_proxy(no_proxy.clone()),
),
"all_proxy" => proxies.push(
Proxy::all(v)
.map_err(|e| FocusError::InvalidProxyConfig(e))?
.map_err( FocusError::InvalidProxyConfig)?
.no_proxy(no_proxy.clone()),
),
_ => (),
Expand Down
61 changes: 26 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,26 @@ async fn main_loop() -> ExitCode {
let mut report_cache: ReportCache = ReportCache {
cache: HashMap::new(),
};


match CONFIG.queries_to_cache_file_path.clone() {
Some(filename) => {
let lines = util::read_lines(filename.clone().to_string());

match lines {
Ok(ok_lines) => {
for line in ok_lines {
let Ok(ok_line) = line else{
warn!("A line in the file {} is not readable", filename);
continue;
};
report_cache.cache.insert(ok_line, ("".into(), UNIX_EPOCH));
}
}
Err(_) => {
error!("The file {} cannot be opened", filename);
exit(2);
if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() {

let lines = util::read_lines(filename.clone().to_string());
match lines {
Ok(ok_lines) => {
for line in ok_lines {
let Ok(ok_line) = line else{
warn!("A line in the file {} is not readable", filename);
continue;
};
report_cache.cache.insert(ok_line, ("".into(), UNIX_EPOCH));
}
}
Err(_) => {
error!("The file {} cannot be opened", filename);
exit(2);
}
}
None => {}
}

let mut failures = 0;
Expand Down Expand Up @@ -216,8 +214,8 @@ async fn process_tasks(

fn parse_query(task: &BeamTask) -> Result<blaze::Query, FocusError> {
let decoded = general_purpose::STANDARD
.decode(task.body.to_owned())
.map_err(|e| FocusError::DecodeError(e))?;
.decode(&task.body)
.map_err(FocusError::DecodeError)?;

let query: blaze::Query =
from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?;
Expand All @@ -236,14 +234,14 @@ async fn run_query(

if query.lang == "cql" {
// TODO: Change query.lang to an enum
return Ok(run_cql_query(task, query, obf_cache, report_cache, project).await)?;
Ok(run_cql_query(task, query, obf_cache, report_cache, project).await)?
} else {
return Ok(beam::beam_result::perm_failed(
Ok(beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
vec![task.from.clone()],
task.id,
format!("Can't run inqueries with language {}", query.lang),
));
))
}
}

Expand All @@ -255,19 +253,12 @@ async fn run_cql_query(
project: String
) -> Result<BeamResult, FocusError> {

let mut err = beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
vec![task.to_owned().from],
task.to_owned().id,
String::new(),
);

let encoded_query =
query.lib["content"][0]["data"]
.as_str()
.ok_or(FocusError::ParsingError(format!(
"Not a valid library: Field .content[0].data not found. Library: {}",
query.lib.to_string()
query.lib
)))?;

let mut key_exists = false;
Expand Down Expand Up @@ -344,12 +335,12 @@ fn replace_cql_library(mut query: Query) -> Result<Query, FocusError> {
.as_str()
.ok_or(FocusError::ParsingError(format!(
"{} is not a valid library: Field .content[0].data not found.",
query.lib.to_string()
query.lib
)))?;

let decoded_cql = general_purpose::STANDARD
.decode(old_data_string)
.map_err(|e| FocusError::DecodeError(e))?;
.map_err(FocusError::DecodeError)?;

let decoded_string = str::from_utf8(&decoded_cql)
.map_err(|_| FocusError::ParsingError("CQL query was invalid".into()))?;
Expand Down Expand Up @@ -380,10 +371,10 @@ fn beam_result(
measure_report: String,
) -> Result<BeamResult, FocusError> {
let data = general_purpose::STANDARD.encode(measure_report.as_bytes());
return Ok(beam::beam_result::succeeded(
Ok(beam::beam_result::succeeded(
CONFIG.beam_app_id_long.clone(),
vec![task.from],
task.id,
data
));
))
}
24 changes: 7 additions & 17 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ struct MeasureReport {
type_: String, //because "type" is a reserved keyword
}

const MU: f64 = 0.;

pub(crate) fn get_json_field(json_string: &str, field: &str) -> Result<Value, serde_json::Error> {
let json: Value = serde_json::from_str(json_string)?;
Ok(json[field].clone())
Expand Down Expand Up @@ -111,6 +109,7 @@ pub(crate) fn replace_cql(decoded_library: impl Into<String>) -> String {
("DKTK_STRAT_PROCEDURE_STRATIFIER", "define Procedure:\nif InInitialPopulation then [Procedure] else {} as List <Procedure>\n\ndefine function ProcedureType(procedure FHIR.Procedure):\nprocedure.category.coding.where(system = 'http://dktk.dkfz.de/fhir/onco/core/CodeSystem/SYSTTherapieartCS').code.first()"),
("DKTK_STRAT_MEDICATION_STRATIFIER", "define MedicationStatement:\nif InInitialPopulation then [MedicationStatement] else {} as List <MedicationStatement>"),
("DKTK_STRAT_ENCOUNTER_STRATIFIER", "define Encounter:\nif InInitialPopulation then [Encounter] else {} as List<Encounter>\n\ndefine function Departments(encounter FHIR.Encounter):\nencounter.identifier.where(system = 'http://dktk.dkfz.de/fhir/sid/hki-department').value.first()"),
("DKTK_STRAT_HISTOLOGY_STRATIFIER", "define Histo:\nif InInitialPopulation then [Observation] else {} as List <Observation>\n\ndefine function Histlogoy(histo FHIR.Observation):\n if histo.code.coding.where(code = '59847-4').code.first() is null then 0 else 1"),
("DKTK_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:"),
("EXLIQUID_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:\n exists ExliquidSpecimen and\n"),
("EXLIQUID_STRAT_W_ALIQUOTS", "define InInitialPopulation: exists ExliquidSpecimenWithAliquot and \n"),
Expand Down Expand Up @@ -157,7 +156,6 @@ pub fn obfuscate_counts_mr(
"patients" => {
obfuscate_counts_recursive(
&mut g.population,
MU,
delta_patient,
epsilon,
1,
Expand All @@ -168,7 +166,6 @@ pub fn obfuscate_counts_mr(
)?;
obfuscate_counts_recursive(
&mut g.stratifier,
MU,
delta_patient,
epsilon,
2,
Expand All @@ -181,7 +178,6 @@ pub fn obfuscate_counts_mr(
"diagnosis" => {
obfuscate_counts_recursive(
&mut g.population,
MU,
delta_diagnosis,
epsilon,
1,
Expand All @@ -192,7 +188,6 @@ pub fn obfuscate_counts_mr(
)?;
obfuscate_counts_recursive(
&mut g.stratifier,
MU,
delta_diagnosis,
epsilon,
2,
Expand All @@ -205,7 +200,6 @@ pub fn obfuscate_counts_mr(
"specimen" => {
obfuscate_counts_recursive(
&mut g.population,
MU,
delta_specimen,
epsilon,
1,
Expand All @@ -216,7 +210,6 @@ pub fn obfuscate_counts_mr(
)?;
obfuscate_counts_recursive(
&mut g.stratifier,
MU,
delta_specimen,
epsilon,
2,
Expand All @@ -229,7 +222,6 @@ pub fn obfuscate_counts_mr(
"procedures" => {
obfuscate_counts_recursive(
&mut g.population,
MU,
delta_procedures,
epsilon,
1,
Expand All @@ -240,7 +232,6 @@ pub fn obfuscate_counts_mr(
)?;
obfuscate_counts_recursive(
&mut g.stratifier,
MU,
delta_procedures,
epsilon,
2,
Expand All @@ -253,7 +244,6 @@ pub fn obfuscate_counts_mr(
"medicationStatements" => {
obfuscate_counts_recursive(
&mut g.population,
MU,
delta_medication_statements,
epsilon,
1,
Expand All @@ -264,7 +254,6 @@ pub fn obfuscate_counts_mr(
)?;
obfuscate_counts_recursive(
&mut g.stratifier,
MU,
delta_medication_statements,
epsilon,
2,
Expand All @@ -287,7 +276,6 @@ pub fn obfuscate_counts_mr(

fn obfuscate_counts_recursive(
val: &mut Value,
mu: f64,
delta: f64,
epsilon: f64,
bin: Bin,
Expand All @@ -301,7 +289,7 @@ fn obfuscate_counts_recursive(
Value::Object(map) => {
if let Some(count_val) = map.get_mut("count") {
if let Some(count) = count_val.as_u64() {
if count >= 1 && count <= 10 {
if (1..=10).contains(&count) {
*count_val = json!(10);
} else {
let obfuscated = get_from_cache_or_privatize(
Expand All @@ -315,7 +303,7 @@ fn obfuscate_counts_recursive(
rounding_step,
&mut rng,
)
.map_err(|e| FocusError::LaplaceError(e));
.map_err(FocusError::LaplaceError);

*count_val = json!(obfuscated?);
}
Expand All @@ -324,7 +312,6 @@ fn obfuscate_counts_recursive(
for (_, sub_val) in map.iter_mut() {
obfuscate_counts_recursive(
sub_val,
mu,
delta,
epsilon,
bin,
Expand All @@ -339,7 +326,6 @@ fn obfuscate_counts_recursive(
for sub_val in vec.iter_mut() {
obfuscate_counts_recursive(
sub_val,
mu,
delta,
epsilon,
bin,
Expand Down Expand Up @@ -505,6 +491,10 @@ mod test {

assert_eq!(replace_cql(decoded_library), expected_result);

let decoded_library = "DKTK_STRAT_HISTOLOGY_STRATIFIER";
let expected_result = "define Histo:\nif InInitialPopulation then [Observation] else {} as List <Observation>\n\ndefine function Histlogoy(histo FHIR.Observation):\n if histo.code.coding.where(code = '59847-4').code.first() is null then 0 else 1\n";
assert_eq!(replace_cql(decoded_library), expected_result);

let decoded_library = "INVALID_KEY";
let expected_result = "INVALID_KEY";
assert_eq!(replace_cql(decoded_library), expected_result);
Expand Down

0 comments on commit 5113864

Please sign in to comment.