From d921d6b824bfc504d0303f25b12348b2e9b0db18 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 1 Mar 2024 09:36:44 +0100 Subject: [PATCH 01/11] number of retries reduced to 150 again --- src/task_processing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task_processing.rs b/src/task_processing.rs index dddaa1f..d24f49b 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -66,7 +66,7 @@ async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, } }; - const MAX_TRIES: u32 = 3600; + const MAX_TRIES: u32 = 150; for attempt in 0..MAX_TRIES { match beam::answer_task(&result).await { Ok(_) => break, From 1d8a459b5876aa7341da7833d4d30f0c4112faa4 Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 18 Mar 2024 12:50:11 +0000 Subject: [PATCH 02/11] fix: Poll tasks sequentially again but process them concurrently --- Cargo.toml | 3 +- dev/docker-compose.yml | 1 + src/main.rs | 152 ++++++++++++++++++++++------------ src/task_processing.rs | 180 ++++++++++++++--------------------------- 4 files changed, 163 insertions(+), 173 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6e5e40..19735ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,9 @@ tracing-subscriber = { version = "0.3.11", default_features = false, features = once_cell = "1.18" # Command Line Interface -clap = { version = "4.0", default_features = false, features = ["std", "env", "derive", "help"] } +clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } rand = { default-features = false, version = "0.8.5" } +futures-util = { version = "0.3", default-features = false, features = ["std"] } [dev-dependencies] tokio-test = "0.4.2" diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 4396d18..b4a73c7 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -86,6 +86,7 @@ services: BEAM_PROXY_URL: http://proxy1:8081 RETRY_COUNT: 30 OBFUSCATE: "no" + RUST_LOG: "debug,hyper=info" blaze: image: samply/blaze volumes: diff --git a/src/main.rs b/src/main.rs index 64218b9..c7919df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,17 +12,21 @@ mod intermediate_rep; mod task_processing; mod util; +use base64::engine::general_purpose; use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; -use beam_lib::{MsgId, TaskRequest, TaskResult}; +use beam_lib::{TaskRequest, TaskResult}; +use futures_util::future::BoxFuture; +use futures_util::FutureExt; use laplace_rs::ObfCache; -use task_processing::TaskQueue; use tokio::sync::Mutex; +use crate::blaze::parse_blaze_query; +use crate::config::EndpointType; use crate::util::{is_cql_tampered_with, obfuscate_counts_mr}; use crate::{config::CONFIG, errors::FocusError}; use blaze::CqlQuery; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::ops::DerefMut; use std::process::ExitCode; use std::str; @@ -103,64 +107,110 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { - // TODO: The report cache init should be an fn on the cache - let report_cache: ReportCache = ReportCache::new(); - - let mut seen_tasks = Default::default(); - let mut task_queue = task_processing::spawn_task_workers(report_cache); + let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type { + EndpointType::Blaze => || blaze::check_availability().boxed(), + EndpointType::Omop => || async { true }.boxed(), // TODO health check + }; let mut failures = 0; - while failures < CONFIG.retry_count { - if failures > 0 { - warn!( - "Retrying connection (attempt {}/{})", - failures + 1, + while !(beam::check_availability().await && endpoint_service_available().await) { + failures += 1; + if failures >= CONFIG.retry_count { + error!( + "Encountered too many errors -- exiting after {} attempts.", CONFIG.retry_count ); - tokio::time::sleep(Duration::from_secs(2)).await; + return ExitCode::from(22); } - if !(beam::check_availability().await) { - failures += 1; - } - if CONFIG.endpoint_type == config::EndpointType::Blaze { - if !(blaze::check_availability().await) { - failures += 1; - } - } else if CONFIG.endpoint_type == config::EndpointType::Omop { + tokio::time::sleep(Duration::from_secs(2)).await; + warn!( + "Retrying connection (attempt {}/{})", + failures, + CONFIG.retry_count + ); + }; + let report_cache = Arc::new(Mutex::new(ReportCache::new())); + let obf_cache = Arc::new(Mutex::new(ObfCache { + cache: Default::default(), + })); + task_processing::process_tasks(move |task| { + let obf_cache = obf_cache.clone(); + let report_cache = report_cache.clone(); + process_task(&task, obf_cache, report_cache).boxed_local() + }).await; + ExitCode::FAILURE +} - //TODO health check - } +async fn process_task( + task: &BeamTask, + obf_cache: Arc>, + report_cache: Arc>, +) -> Result { + debug!("Processing task {}", task.id); - if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { - warn!("Encountered the following error, while processing tasks: {e}"); - failures += 1; - } else { - failures = 0; - } + let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { + project: "default_obfuscation".to_string(), + execute: true, + }); + + if metadata.project == "focus-healthcheck" { + return Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + "healthy".into() + )); } - error!( - "Encountered too many errors -- exiting after {} attempts.", - CONFIG.retry_count - ); - ExitCode::from(22) -} -async fn process_tasks( - task_queue: &mut TaskQueue, - seen: &mut HashSet, -) -> Result<(), FocusError> { - debug!("Start processing tasks..."); - let tasks = beam::retrieve_tasks().await?; - for task in tasks { - if seen.contains(&task.id) { - continue; + if metadata.project == "exporter" { + let body = &task.body; + return Ok(run_exporter_query(task, body, metadata.execute).await)?; + } + + if CONFIG.endpoint_type == EndpointType::Blaze { + let query = parse_blaze_query(task)?; + if query.lang == "cql" { + // TODO: Change query.lang to an enum + + Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? + } else { + warn!("Can't run queries with language {} in Blaze", query.lang); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with language {} and/or endpoint type {}", + query.lang, CONFIG.endpoint_type + ), + )) } - seen.insert(task.id); - task_queue - .send(task) - .await - .expect("Receiver is never dropped"); + } else if CONFIG.endpoint_type == EndpointType::Omop { + let decoded = util::base64_decode(&task.body)?; + let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = + serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + //TODO check that the language is ast + let query_decoded = general_purpose::STANDARD + .decode(intermediate_rep_query.query) + .map_err(FocusError::DecodeError)?; + let ast: ast::Ast = + serde_json::from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + + Ok(run_intermediate_rep_query(task, ast).await)? + } else { + warn!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ), + )) } - Ok(()) } async fn run_cql_query( diff --git a/src/task_processing.rs b/src/task_processing.rs index d24f49b..d1737a6 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -1,60 +1,71 @@ -use std::{sync::Arc, collections::HashMap, time::Duration}; +use std::{rc::Rc, time::Duration}; -use base64::{engine::general_purpose, Engine as _}; -use laplace_rs::ObfCache; -use tokio::sync::{mpsc, Semaphore, Mutex}; +use futures_util::{future::LocalBoxFuture, FutureExt, Stream, StreamExt}; use tracing::{error, warn, debug, Instrument, info_span}; -use crate::{ReportCache, errors::FocusError, beam, BeamTask, BeamResult, run_exporter_query, config::{EndpointType, CONFIG}, run_cql_query, intermediate_rep, ast, run_intermediate_rep_query, Metadata, blaze::parse_blaze_query, util}; +use crate::{errors::FocusError, beam, BeamTask, BeamResult}; const NUM_WORKERS: usize = 3; -const WORKER_BUFFER: usize = 32; -pub type TaskQueue = mpsc::Sender; - -pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { - let (tx, mut rx) = mpsc::channel::(WORKER_BUFFER); - - let obf_cache = Arc::new(Mutex::new(ObfCache { - cache: HashMap::new(), - })); - - let report_cache: Arc> = Arc::new(Mutex::new(report_cache)); - - tokio::spawn(async move { - let semaphore = Arc::new(Semaphore::new(NUM_WORKERS)); - while let Some(task) = rx.recv().await { - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let local_report_cache = report_cache.clone(); - let local_obf_cache = obf_cache.clone(); - tokio::spawn(async move { - let span = info_span!("task handling", %task.id); - handle_beam_task(task, local_obf_cache, local_report_cache).instrument(span).await; - drop(permit) - }); - } - }); - - tx +pub async fn process_tasks(task_hanlder: F) +where + F: Fn(&BeamTask) -> LocalBoxFuture<'_, Result> + Clone + 'static, +{ + stream_task_results(task_hanlder) + .buffer_unordered(NUM_WORKERS) + .for_each_concurrent(None, |(task, task_result)| answer_task_result(task, task_result)) + .await } -async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, local_report_cache: Arc>) { - let task_claiming = beam::claim_task(&task); - let mut task_processing = std::pin::pin!(process_task(&task, local_obf_cache, local_report_cache)); - let task_result = tokio::select! { - // If task task processing happens before claiming is done drop the task claiming future - task_processed = &mut task_processing => { - task_processed - }, - task_claimed = task_claiming => { - if let Err(e) = task_claimed { - warn!("Failed to claim task: {e}"); - } else { - debug!("Successfully claimed task"); - }; - task_processing.await - } +fn stream_task_results(on_task: F) -> impl Stream)>> +where + F: Fn(&BeamTask) -> LocalBoxFuture<'_, Result> + Clone + 'static, +{ + let on_task_claimed = |res| if let Err(e) = res { + warn!("Failed to claim task: {e}"); + } else { + debug!("Successfully claimed task"); }; + futures_util::stream::repeat_with(beam::retrieve_tasks) + .filter_map(|v| async { + match v.await { + Ok(mut ts) => ts.pop(), + Err(e) => { + warn!("Failed to get tasks from beam: {e}"); + tokio::time::sleep(Duration::from_secs(10)).await; + None + }, + } + }) + .then(move |t| { + let id = t.id; + let span = info_span!("task", %id); + let span_for_handler = span.clone(); + let task = Rc::new(t); + let on_task = on_task.clone(); + async move { + let task1 = Rc::clone(&task); + let task2 = Rc::clone(&task); + let mut task_claiming = std::pin::pin!(beam::claim_task(&task2)); + let mut task_processing = async move { + on_task(&task1).instrument(span_for_handler).await + }.boxed_local(); + tokio::select! { + task_processed = &mut task_processing => { + tracing::debug!("Proccessed task before claimed"); + on_task_claimed(task_claiming.as_mut().await); + futures_util::future::ready((Rc::try_unwrap(task).unwrap(), task_processed)).boxed_local() + }, + task_claimed = &mut task_claiming => { + on_task_claimed(task_claimed); + task_processing.map(|v| (Rc::try_unwrap(task).unwrap(), v)).boxed_local() + } + } + }.instrument(span) + }) +} + +async fn answer_task_result(task: BeamTask, task_result: Result) { let result = match task_result { Ok(res) => res, Err(e) => { @@ -85,76 +96,3 @@ async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, tokio::time::sleep(Duration::from_secs(2)).await; } } - -async fn process_task( - task: &BeamTask, - obf_cache: Arc>, - report_cache: Arc>, -) -> Result { - debug!("Processing task {}", task.id); - - let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { - project: "default_obfuscation".to_string(), - execute: true, - }); - - if metadata.project == "focus-healthcheck" { - return Ok(beam::beam_result::succeeded( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - "healthy".into() - )); - } - - if metadata.project == "exporter" { - let body = &task.body; - return Ok(run_exporter_query(task, body, metadata.execute).await)?; - } - - if CONFIG.endpoint_type == EndpointType::Blaze { - let query = parse_blaze_query(task)?; - if query.lang == "cql" { - // TODO: Change query.lang to an enum - - Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? - } else { - warn!("Can't run queries with language {} in Blaze", query.lang); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with language {} and/or endpoint type {}", - query.lang, CONFIG.endpoint_type - ), - )) - } - } else if CONFIG.endpoint_type == EndpointType::Omop { - let decoded = util::base64_decode(&task.body)?; - let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = - serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - //TODO check that the language is ast - let query_decoded = general_purpose::STANDARD - .decode(intermediate_rep_query.query) - .map_err(FocusError::DecodeError)?; - let ast: ast::Ast = - serde_json::from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - - Ok(run_intermediate_rep_query(task, ast).await)? - } else { - warn!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ), - )) - } -} From b1727c4f185cb901a77ed9f796e979e034b27ea4 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Mar 2024 13:11:29 +0000 Subject: [PATCH 03/11] Pretty logs --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 19735ac..70c6dd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ laplace_rs = {version = "0.2.0", git = "https://github.com/samply/laplace-rs.git # Logging tracing = { version = "0.1.37", default_features = false } -tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "fmt"] } +tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] } # Global variables once_cell = "1.18" From c00e74252593835a402f53c5fc14ecbe3ff8b736 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 20 Mar 2024 17:58:46 +0100 Subject: [PATCH 04/11] prism age stratifier --- resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER diff --git a/resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER b/resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER new file mode 100644 index 0000000..7c9c6c2 --- /dev/null +++ b/resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER @@ -0,0 +1,2 @@ +define AgeClass: + (AgeInYears()) From 68ad1720c50281a8fdbc16f318858a96a4e10dac Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Thu, 21 Mar 2024 09:20:34 +0100 Subject: [PATCH 05/11] chore: typo Co-authored-by: Tobias Kussel --- src/task_processing.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task_processing.rs b/src/task_processing.rs index d1737a6..dea497b 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -7,11 +7,11 @@ use crate::{errors::FocusError, beam, BeamTask, BeamResult}; const NUM_WORKERS: usize = 3; -pub async fn process_tasks(task_hanlder: F) +pub async fn process_tasks(task_handler: F) where F: Fn(&BeamTask) -> LocalBoxFuture<'_, Result> + Clone + 'static, { - stream_task_results(task_hanlder) + stream_task_results(task_handler) .buffer_unordered(NUM_WORKERS) .for_each_concurrent(None, |(task, task_result)| answer_task_result(task, task_result)) .await From 152c8ac579d110b6570ff4f496afa86133dcd181 Mon Sep 17 00:00:00 2001 From: "p.delpy@dkfz-heidelberg.de" Date: Wed, 3 Apr 2024 08:13:18 +0200 Subject: [PATCH 06/11] feature: add single year age stratifier for condition.onset.value for ITCC --- resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER diff --git a/resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER b/resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER new file mode 100644 index 0000000..3ab0641 --- /dev/null +++ b/resources/cql/ITCC_STRAT_AGE_CLASS_STRATIFIER @@ -0,0 +1,2 @@ +define function DiagnosisAge(condition FHIR.Condition): +condition.onset.value \ No newline at end of file From 64c494eabfd534e99f280d42629c32821be39482 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 3 Apr 2024 13:55:45 +0200 Subject: [PATCH 07/11] prism age placeholder rename because reasons --- ...BMRI_STRAT_AGE_STRATIFIER => PRISM_STRAT_AGE_STRATIFIER_BBMRI} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename resources/cql/{PRISM_BBMRI_STRAT_AGE_STRATIFIER => PRISM_STRAT_AGE_STRATIFIER_BBMRI} (100%) diff --git a/resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER b/resources/cql/PRISM_STRAT_AGE_STRATIFIER_BBMRI similarity index 100% rename from resources/cql/PRISM_BBMRI_STRAT_AGE_STRATIFIER rename to resources/cql/PRISM_STRAT_AGE_STRATIFIER_BBMRI From 008958e6690aa1899041a8ab114bf006f06f0912 Mon Sep 17 00:00:00 2001 From: lablans Date: Wed, 3 Apr 2024 12:30:10 +0000 Subject: [PATCH 08/11] Use generalized Rust CI/CD --- .github/workflows/rust.yml | 142 +++---------------------------------- Dockerfile | 9 ++- dev/focusdev | 2 +- 3 files changed, 16 insertions(+), 137 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bebe260..a464e43 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -8,141 +8,17 @@ on: # Fetch new base image updates every night at 1am - cron: '0 1 * * *' -env: - CARGO_TERM_COLOR: always - PROFILE: release - jobs: - pre-check: - name: Security, License Check - runs-on: ubuntu-22.04 - - steps: - - uses: actions/checkout@v3 - - uses: EmbarkStudios/cargo-deny-action@v1 - - build-rust: - name: Build (Rust) - runs-on: ubuntu-22.04 - - strategy: - matrix: - arch: - - amd64 - - arm64 - - steps: - - name: Set arch ${{ matrix.arch }} - env: - ARCH: ${{ matrix.arch }} - run: | - if [ "${ARCH}" == "arm64" ]; then - echo "rustarch=aarch64-unknown-linux-gnu" >> $GITHUB_ENV - elif [ "${ARCH}" == "amd64" ]; then - echo "rustarch=x86_64-unknown-linux-gnu" >> $GITHUB_ENV - else - exit 1 - fi - if [ "$(dpkg --print-architecture)" != "${ARCH}" ]; then - echo "Cross-compiling to ${ARCH}." - echo "is_cross=true" >> $GITHUB_ENV - else - echo "Natively compiling to ${ARCH}." - echo "is_cross=false" >> $GITHUB_ENV - fi - - name: Set profile ${{ env.PROFILE }} - env: - PROFILE: ${{ env.PROFILE }} - run: | - if [ "${PROFILE}" == "release" ]; then - echo "profilestr=--release" >> $GITHUB_ENV - elif [ "${PROFILE}" == "debug" ]; then - echo "profilestr=" >> $GITHUB_ENV - else - echo "profilestr=--profile $PROFILE" >> $GITHUB_ENV - fi - - uses: actions/checkout@v3 - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - override: true - target: ${{ env.rustarch }} - - uses: Swatinem/rust-cache@v2 - with: - key: ${{ matrix.arch }}-${{ env.PROFILE }} - prefix-key: "v1-rust" # Increase to invalidate old caches. - - name: Build (cross to ${{ matrix.arch }}) - if: env.is_cross == 'true' - uses: actions-rs/cargo@v1 - with: - use-cross: ${{ env.is_cross }} - command: build - args: --target ${{ env.rustarch }} ${{ matrix.features && format('--features {0}', matrix.features) }} ${{ env.profilestr }} - - name: Build (native) - if: env.is_cross == 'false' - run: | - BINS=$(cargo build --tests --bins --message-format=json --target ${{ env.rustarch }} ${{ matrix.features && format('--features {0}', matrix.features) }} ${{ env.profilestr }} | jq -r 'select(.profile.test == true) | .executable | select(. != null)') - mkdir -p testbinaries/ - for testbin in $BINS; do - mv -v $testbin testbinaries/ - done - - name: Upload (bins) - uses: actions/upload-artifact@v3 - with: - name: binaries-${{ matrix.arch }} - path: | - target/${{ env.rustarch }}/${{ env.PROFILE }}/focus - - name: Upload (test, native only) - if: matrix.arch == 'amd64' - uses: actions/upload-artifact@v3 - with: - name: testbinaries-${{ matrix.arch }} - path: | - testbinaries/* - - test: - name: Run tests - needs: [ build-rust ] - runs-on: ubuntu-22.04 - - steps: - - uses: actions/checkout@v3 - - uses: actions/download-artifact@v3 - with: - name: testbinaries-amd64 - path: testbinaries/ - - run: | - for testbin in testbinaries/*; do - chmod +x $testbin - $testbin - done - - docker-focus: - needs: [ build-rust, pre-check, test ] - if: github.ref_protected == true || github.event_name == 'workflow_dispatch' - - # This workflow defines how a maven package is built, tested and published. - # Visit: https://github.com/samply/github-workflows/blob/develop/.github/workflows/docker-ci.yml, for more information - uses: samply/github-workflows/.github/workflows/docker-ci.yml@main + build-with-samply: + uses: samply/github-workflows/.github/workflows/rust.yml@main with: - # The Docker Hub Repository you want eventually push to, e.g samply/share-client - image-name: "samply/focus" - # Define special prefixes for docker tags. They will prefix each images tag. - # image-tag-prefix: "foo" - # Define the build context of your image, typically default '.' will be enough - # build-context: '.' - # Define the Dockerfile of your image, typically default './Dockerfile' will be enough - build-file: './Dockerfile' - # NOTE: This doesn't work currently - # A list of build arguments, passed to the docker build -# build-args: | -# PROFILE=${{ env.PROFILE }} -# COMPONENT=broker - # Define the target platforms of the docker build (default "linux/amd64,linux/arm64/v8") - # build-platforms: "linux/amd64" - # If your actions generate an artifact in a previous build step, you can tell this workflow to download it - artifact-name: '*' - # This passes the secrets from calling workflow to the called workflow + image-prefix: "samply/" + components: '[ "focus" ]' + #architectures: '[ "amd64", "arm64" ]' + #profile: debug + test-via-script: false + #features: '[ "" ]' + push-to: ${{ (github.ref_protected == true || github.event_name == 'workflow_dispatch') && 'dockerhub' || 'ghcr' }} secrets: DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} diff --git a/Dockerfile b/Dockerfile index 24f078a..9130271 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,10 +7,13 @@ FROM alpine AS chmodder ARG TARGETARCH -COPY /artifacts/binaries-$TARGETARCH/focus /app/ +ARG COMPONENT +ARG FEATURE +COPY /artifacts/binaries-$TARGETARCH$FEATURE/$COMPONENT /app/$COMPONENT RUN chmod +x /app/* FROM gcr.io/distroless/cc-debian12 -COPY --from=chmodder /app/* /usr/local/bin/ -ENTRYPOINT [ "/usr/local/bin/focus" ] +ARG COMPONENT +COPY --from=chmodder /app/$COMPONENT /usr/local/bin/samply +ENTRYPOINT [ "/usr/local/bin/samply" ] diff --git a/dev/focusdev b/dev/focusdev index 618541b..bd09418 100755 --- a/dev/focusdev +++ b/dev/focusdev @@ -86,7 +86,7 @@ function build() { function build_docker() { BACK2=$(pwd) cd $SD - docker compose build --build-arg TARGETARCH=$ARCH + docker compose build --build-arg TARGETARCH=$ARCH --build-arg COMPONENT=focus cd $BACK2 } From 8f4cb72abacfa00dcfefe7c279a554ebf193c3f3 Mon Sep 17 00:00:00 2001 From: lablans Date: Thu, 11 Apr 2024 13:56:46 +0000 Subject: [PATCH 09/11] Fix Histology typo --- resources/cql/DKTK_STRAT_HISTOLOGY_STRATIFIER | 2 +- src/util.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/resources/cql/DKTK_STRAT_HISTOLOGY_STRATIFIER b/resources/cql/DKTK_STRAT_HISTOLOGY_STRATIFIER index 2a56610..1c5c077 100644 --- a/resources/cql/DKTK_STRAT_HISTOLOGY_STRATIFIER +++ b/resources/cql/DKTK_STRAT_HISTOLOGY_STRATIFIER @@ -1,5 +1,5 @@ define Histo: if InInitialPopulation then [Observation] else {} as List -define function Histlogoy(histo FHIR.Observation): +define function Histology(histo FHIR.Observation): if histo.code.coding.where(code = '59847-4').code.first() is null then 0 else 1 diff --git a/src/util.rs b/src/util.rs index 47b938d..c24cd4e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -474,7 +474,7 @@ mod test { assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "DKTK_STRAT_HISTOLOGY_STRATIFIER"; - let expected_result = "define Histo:\nif InInitialPopulation then [Observation] else {} as List \n\ndefine function Histlogoy(histo FHIR.Observation):\n if histo.code.coding.where(code = '59847-4').code.first() is null then 0 else 1\n"; + let expected_result = "define Histo:\nif InInitialPopulation then [Observation] else {} as List \n\ndefine function Histology(histo FHIR.Observation):\n if histo.code.coding.where(code = '59847-4').code.first() is null then 0 else 1\n"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "INVALID_KEY"; From f59c30c46c0955b67ca6ccf0104c4175f02a1233 Mon Sep 17 00:00:00 2001 From: Torben Brenner Date: Mon, 22 Apr 2024 11:18:40 +0200 Subject: [PATCH 10/11] fix: change entrypoint of image to focus --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9130271..62987bb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,5 +15,4 @@ RUN chmod +x /app/* FROM gcr.io/distroless/cc-debian12 ARG COMPONENT COPY --from=chmodder /app/$COMPONENT /usr/local/bin/samply -ENTRYPOINT [ "/usr/local/bin/samply" ] - +ENTRYPOINT [ "/usr/local/bin/focus" ] From 6ecd5b8d6885d57df55c9ee6d7743b02e410cc99 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 14 May 2024 07:42:10 +0000 Subject: [PATCH 11/11] fix: Answer tasks as soon as they are ready --- src/main.rs | 2 +- src/task_processing.rs | 70 ++++++++++++++++++++---------------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/src/main.rs b/src/main.rs index c7919df..6c3cdea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -135,7 +135,7 @@ async fn main_loop() -> ExitCode { task_processing::process_tasks(move |task| { let obf_cache = obf_cache.clone(); let report_cache = report_cache.clone(); - process_task(&task, obf_cache, report_cache).boxed_local() + process_task(task, obf_cache, report_cache).boxed_local() }).await; ExitCode::FAILURE } diff --git a/src/task_processing.rs b/src/task_processing.rs index dea497b..ac55e6f 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -1,30 +1,22 @@ use std::{rc::Rc, time::Duration}; -use futures_util::{future::LocalBoxFuture, FutureExt, Stream, StreamExt}; -use tracing::{error, warn, debug, Instrument, info_span}; +use futures_util::{future::LocalBoxFuture, FutureExt, StreamExt}; +use tracing::{debug, error, info_span, warn, Instrument}; -use crate::{errors::FocusError, beam, BeamTask, BeamResult}; +use crate::{beam, errors::FocusError, BeamResult, BeamTask}; const NUM_WORKERS: usize = 3; -pub async fn process_tasks(task_handler: F) +pub async fn process_tasks(task_hanlder: F) where F: Fn(&BeamTask) -> LocalBoxFuture<'_, Result> + Clone + 'static, { - stream_task_results(task_handler) - .buffer_unordered(NUM_WORKERS) - .for_each_concurrent(None, |(task, task_result)| answer_task_result(task, task_result)) - .await -} - -fn stream_task_results(on_task: F) -> impl Stream)>> -where - F: Fn(&BeamTask) -> LocalBoxFuture<'_, Result> + Clone + 'static, -{ - let on_task_claimed = |res| if let Err(e) = res { - warn!("Failed to claim task: {e}"); - } else { - debug!("Successfully claimed task"); + let on_task_claimed = |res: &Result| { + if let Err(e) = res { + warn!("Failed to claim task: {e}"); + } else { + debug!("Successfully claimed task"); + } }; futures_util::stream::repeat_with(beam::retrieve_tasks) .filter_map(|v| async { @@ -34,43 +26,49 @@ where warn!("Failed to get tasks from beam: {e}"); tokio::time::sleep(Duration::from_secs(10)).await; None - }, + } } }) .then(move |t| { let id = t.id; let span = info_span!("task", %id); let span_for_handler = span.clone(); + let on_task = task_hanlder.clone(); let task = Rc::new(t); - let on_task = on_task.clone(); + let t1 = Rc::clone(&task); + let t2 = Rc::clone(&task); + #[allow(clippy::async_yields_async)] async move { - let task1 = Rc::clone(&task); - let task2 = Rc::clone(&task); - let mut task_claiming = std::pin::pin!(beam::claim_task(&task2)); - let mut task_processing = async move { - on_task(&task1).instrument(span_for_handler).await - }.boxed_local(); + let mut task_claiming = std::pin::pin!(beam::claim_task(&t1)); + let mut task_processing = async move { on_task(&t2).await }.boxed_local(); tokio::select! { task_processed = &mut task_processing => { - tracing::debug!("Proccessed task before claimed"); - on_task_claimed(task_claiming.as_mut().await); - futures_util::future::ready((Rc::try_unwrap(task).unwrap(), task_processed)).boxed_local() + debug!("Proccessed task before it was claimed"); + answer_task_result(&task, task_processed).await; + futures_util::future::ready(()).boxed_local() }, task_claimed = &mut task_claiming => { - on_task_claimed(task_claimed); - task_processing.map(|v| (Rc::try_unwrap(task).unwrap(), v)).boxed_local() + on_task_claimed(&task_claimed); + task_processing + .then(move |res| async move { answer_task_result(&task, res).await }) + .instrument(span_for_handler) + .boxed_local() } } - }.instrument(span) + } + .instrument(span) }) + .buffer_unordered(NUM_WORKERS) + .for_each(|_| async {}) + .await } -async fn answer_task_result(task: BeamTask, task_result: Result) { +async fn answer_task_result(task: &BeamTask, task_result: Result) { let result = match task_result { Ok(res) => res, Err(e) => { warn!("Failed to execute query: {e}"); - if let Err(e) = beam::fail_task(&task, e.user_facing_error()).await { + if let Err(e) = beam::fail_task(task, e.user_facing_error()).await { warn!("Failed to report failure to beam: {e}"); } return; @@ -82,9 +80,7 @@ async fn answer_task_result(task: BeamTask, task_result: Result break, Err(FocusError::ConfigurationError(s)) => { - error!( - "FATAL: Unable to report back to Beam due to a configuration issue: {s}" - ); + error!("FATAL: Unable to report back to Beam due to a configuration issue: {s}"); } Err(FocusError::UnableToAnswerTask(e)) => { warn!("Unable to report task result to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES}).");