Skip to content

Commit

Permalink
Merge pull request #90 from samply/develop
Browse files Browse the repository at this point in the history
Exporter integration
  • Loading branch information
djuarezgf authored Dec 22, 2023
2 parents 5d5717d + b9f2fbc commit d916132
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 8 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ BEAM_APP_ID_LONG = "app1.broker.example.com"
```bash
RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks, default value: 32
ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", default value: "blaze"
EXPORTER_URL = " https://exporter.site/" # The exporter URL
OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no", default value: "yes"
OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1
DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1
Expand All @@ -45,7 +46,7 @@ DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in t
DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1
EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1
ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid; dktk_supervisors"
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid; dktk_supervisors"
QUERIES_TO_CACHE_FILE_PATH = "resources/bbmri" # The path to the file containing BASE64 encoded queries whose results are to be cached, if not set, no results are cached
PROVIDER = "name" #OMOP provider name
PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" #Base64 encoded OMOP provider icon
Expand Down
14 changes: 9 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ impl fmt::Display for EndpointType {
}




pub(crate) static CONFIG: Lazy<Config> = Lazy::new(|| {
debug!("Loading config");
Config::load().unwrap_or_else(|e| {
Expand Down Expand Up @@ -69,14 +67,18 @@ struct CliArgs {
#[clap(long, env, value_parser, default_value = "32")]
retry_count: usize,

/// The endpoint base URL, e.g. https://blaze.site/fhir
/// The endpoint base URL, e.g. https://blaze.site/fhir/
#[clap(long, env, value_parser)]
endpoint_url: Option<Uri>,

/// The endpoint base URL, e.g. https://blaze.site/fhir, for the sake of backward compatibility, use endpoint_url instead
/// The endpoint base URL, e.g. https://blaze.site/fhir/, for the sake of backward compatibility, use endpoint_url instead
#[clap(long, env, value_parser)]
blaze_url: Option<Uri>,

/// The exporter URL, e.g. https://exporter.site/
#[clap(long, env, value_parser)]
exporter_url: Option<Uri>,

/// Type of the endpoint, e.g. "blaze", "omop"
#[clap(long, env, value_parser = clap::value_parser!(EndpointType), default_value = "blaze")]
endpoint_type: EndpointType,
Expand Down Expand Up @@ -122,7 +124,7 @@ struct CliArgs {
rounding_step: usize,

/// Projects for which the results are not to be obfuscated, separated by ;
#[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors")]
#[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter")]
projects_no_obfuscation: String,

/// The path to the file containing BASE64 encoded queries whose results are to be cached
Expand Down Expand Up @@ -153,6 +155,7 @@ pub(crate) struct Config {
pub api_key: String,
pub retry_count: usize,
pub endpoint_url: Uri,
pub exporter_url: Option<Uri>,
pub endpoint_type: EndpointType,
pub obfuscate: Obfuscate,
pub obfuscate_zero: bool,
Expand Down Expand Up @@ -194,6 +197,7 @@ impl Config {
api_key: cli_args.api_key,
retry_count: cli_args.retry_count,
endpoint_url: cli_args.endpoint_url.unwrap_or_else(|| cli_args.blaze_url.expect("Look, mate, you need to set endpoint-url or blaze-url, can't work without, sry")),
exporter_url: cli_args.exporter_url,
endpoint_type: cli_args.endpoint_type,
obfuscate: cli_args.obfuscate,
obfuscate_zero: cli_args.obfuscate_zero,
Expand Down
6 changes: 6 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ pub enum FocusError {
SerializationError(String),
#[error("Unable to post AST: {0}")]
UnableToPostAst(reqwest::Error),
#[error("Unable to post Exporter query: {0}")]
UnableToPostExporterQuery(reqwest::Error),
#[error("Exporter query error in Reqwest: {0}")]
ExporterQueryErrorReqwest(String),
#[error("AST Posting error in Reqwest: {0}")]
AstPostingErrorReqwest(String),
#[error("Invalid Header Value: {0}")]
InvalidHeaderValue(http::header::InvalidHeaderValue),
#[error("Missing Exporter Endpoint")]
MissingExporterEndpoint(),

}
79 changes: 79 additions & 0 deletions src/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use http::header;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use tracing::{debug, warn};

use crate::config::CONFIG;
use crate::errors::FocusError;

struct Params {
method: &'static str,
doing: &'static str,
done: &'static str,
}

const CREATE: Params = Params {
method: "create-query",
doing: "creating",
done: "created",
};

const EXECUTE: Params = Params {
method: "request",
doing: "executing",
done: "executed",
};

pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String, FocusError> {
let Some(exporter_url) = &CONFIG.exporter_url else {
return Err(FocusError::MissingExporterEndpoint());
};

let exporter_params = if execute { EXECUTE } else { CREATE };
debug!("{} exporter query...", exporter_params.doing);

let mut headers = HeaderMap::new();

headers.insert(
header::CONTENT_TYPE, //TODO discard the result, just return OK
HeaderValue::from_static("text/html; charset=UTF-8"),
);

if let Some(auth_header_value) = CONFIG.auth_header.clone() {
headers.insert(
header::AUTHORIZATION,
HeaderValue::from_str(auth_header_value.as_str())
.map_err(FocusError::InvalidHeaderValue)?,
);
}

let resp = CONFIG
.client
.post(format!("{}{}", exporter_url, exporter_params.method))
.headers(headers)
.body(body.clone())
.send()
.await
.map_err(FocusError::UnableToPostExporterQuery)?;

debug!("{} query...", exporter_params.done);

let text = match resp.status() {
StatusCode::OK => {
format!("Query successfully {}", exporter_params.done)
}
code => {
warn!(
"Got unexpected code {code} while {} query; reply was `{}`, debug info: {:?}",
exporter_params.doing, body, resp
);
return Err(FocusError::ExporterQueryErrorReqwest(format!(
"Error while {} query: {:?}",
exporter_params.doing, resp
)));
}
};

Ok(text)
}
35 changes: 33 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ mod banner;
mod beam;
mod blaze;
mod config;
mod exporter;
mod logger;
mod errors;
mod graceful_shutdown;
mod logger;

mod intermediate_rep;
mod util;

Expand Down Expand Up @@ -39,6 +41,7 @@ type BeamResult = TaskResult<beam_lib::RawString>;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct Metadata {
project: String,
execute: bool,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -141,13 +144,19 @@ async fn process_task(

let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata {
project: "default_obfuscation".to_string(),
execute: true,
});

if CONFIG.endpoint_type == config::EndpointType::Blaze {
let query = parse_blaze_cql_query(task)?;
if query.lang == "cql" {
// TODO: Change query.lang to an enum
Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)?
if metadata.project == "exporter" {
let body = &task.body;
Ok(run_exporter_query(task, body, metadata.execute).await)?
} else {
Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)?
}
} else {
warn!("Can't run queries with language {} in Blaze", query.lang);
Ok(beam::beam_result::perm_failed(
Expand Down Expand Up @@ -385,6 +394,28 @@ async fn run_intermediate_rep_query(task: &BeamTask, ast: ast::Ast) -> Result<Be
Ok(result)
}

async fn run_exporter_query(
task: &BeamTask,
body: &String,
execute: bool,
) -> Result<BeamResult, FocusError> {
let mut err = beam::beam_result::perm_failed(
CONFIG.beam_app_id_long.clone(),
vec![task.to_owned().from],
task.to_owned().id,
String::new(),
);

let exporter_result = exporter::post_exporter_query(body, execute).await?;

let result = beam_result(task.to_owned(), exporter_result).unwrap_or_else(|e| {
err.body = beam_lib::RawString(e.to_string());
err
});

Ok(result)
}

fn replace_cql_library(mut query: CqlQuery) -> Result<CqlQuery, FocusError> {
let old_data_value = &query.lib["content"][0]["data"];

Expand Down

0 comments on commit d916132

Please sign in to comment.