From fab5f397a35733cc93b82087a44355049edc9f4b Mon Sep 17 00:00:00 2001 From: lablans Date: Tue, 19 Sep 2023 11:24:19 +0000 Subject: [PATCH 01/22] Fix erroneous warning --- Cargo.toml | 2 +- src/beam.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c07963e..53ebfe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "focus" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "Apache-2.0" diff --git a/src/beam.rs b/src/beam.rs index 8d80e16..8ae080d 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -350,7 +350,7 @@ pub async fn claim_task(task: &BeamTask) -> Result<(), FocusError> { .map_err(|e| FocusError::UnableToAnswerTask(e))?; match resp.status() { - StatusCode::NO_CONTENT => { + StatusCode::CREATED | StatusCode::NO_CONTENT => { info!("Task {} claimed", task.id); Ok(()) }, From 55d5922f45911143f34b306575c952f9455ac4c0 Mon Sep 17 00:00:00 2001 From: lablans Date: Tue, 19 Sep 2023 12:10:55 +0000 Subject: [PATCH 02/22] In Beam's TaskResults, use null rather than "unused" for empty fields. --- src/beam.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 8d80e16..2e1bfc9 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -117,8 +117,8 @@ pub struct BeamResult { pub to: Vec, pub task: Uuid, pub status: Status, - pub metadata: String, - pub body: String, + pub metadata: Option, + pub body: Option, } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] @@ -137,8 +137,8 @@ impl BeamResult { to, task, status: Status::Claimed, - metadata: "unused".to_owned(), - body: "unused".to_owned(), + metadata: None, + body: None, } } pub fn succeeded(from: AppId, to: Vec, task: Uuid, body: String) -> Self { @@ -147,8 +147,8 @@ impl BeamResult { to, task, status: Status::Succeeded, - metadata: "unused".to_owned(), - body, + metadata: None, + body: Some(body), } } @@ -158,8 +158,8 @@ impl BeamResult { to, task, status: Status::PermFailed, - metadata: "unused".to_owned(), - body, + metadata: None, + body: Some(body), } } } From 8b4c208ae362c7bcef98b977731d361d295f46b7 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 19 Sep 2023 16:21:46 +0000 Subject: [PATCH 03/22] First version of `focusdev` --- .github/workflows/rust.yml | 2 +- Dockerfile | 3 +- Dockerfile.ci | 15 ---- dev/docker-compose.yml | 106 ++++++++++++++++++++++++++++ dev/focusdev | 137 +++++++++++++++++++++++++++++++++++++ dev/pki/pki | 123 +++++++++++++++++++++++++++++++++ src/main.rs | 5 ++ 7 files changed, 374 insertions(+), 17 deletions(-) delete mode 100644 Dockerfile.ci create mode 100644 dev/docker-compose.yml create mode 100755 dev/focusdev create mode 100755 dev/pki/pki diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 061f0ff..f6bf92f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -111,7 +111,7 @@ jobs: # Define the build context of your image, typically default '.' will be enough # build-context: '.' # Define the Dockerfile of your image, typically default './Dockerfile' will be enough - build-file: './Dockerfile.ci' + build-file: './Dockerfile' # NOTE: This doesn't work currently # A list of build arguments, passed to the docker build # build-args: | diff --git a/Dockerfile b/Dockerfile index 6ec8b1f..e84eb22 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,12 @@ # This assumes binaries are present, see COPY directive. +ARG IMGNAME=gcr.io/distroless/cc FROM alpine AS chmodder ARG TARGETARCH COPY /artifacts/binaries-$TARGETARCH/focus /app/ RUN chmod +x /app/* -FROM alpine +FROM ${IMGNAME} COPY --from=chmodder /app/* /usr/local/bin/ ENTRYPOINT [ "/usr/local/bin/focus" ] diff --git a/Dockerfile.ci b/Dockerfile.ci deleted file mode 100644 index 615ead6..0000000 --- a/Dockerfile.ci +++ /dev/null @@ -1,15 +0,0 @@ -# This assumes binaries are present, see COPY directive. - -FROM alpine AS chmodder -ARG TARGETARCH -COPY /artifacts/binaries-$TARGETARCH/focus /app/ -RUN chmod +x /app/* - -FROM gcr.io/distroless/cc -#ARG COMPONENT -#ARG TARGETARCH -#COPY /artifacts/binaries-$TARGETARCH/$COMPONENT /usr/local/bin/ -COPY --from=chmodder /app/* /usr/local/bin/ -#ENTRYPOINT [ "/usr/local/bin/$COMPONENT" ] -ENTRYPOINT [ "/usr/local/bin/focus" ] - diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml new file mode 100644 index 0000000..ca2c236 --- /dev/null +++ b/dev/docker-compose.yml @@ -0,0 +1,106 @@ +version: "3.7" +services: + vault: + image: hashicorp/vault + ports: + - 127.0.0.1:8200:8200 + environment: + VAULT_DEV_ROOT_TOKEN_ID: ${VAULT_TOKEN} + VAULT_DEV_LISTEN_ADDRESS: 0.0.0.0:8200 + volumes: + - ./pki:/pki + networks: + - default + broker: + depends_on: [vault] + image: samply/beam-broker:develop + ports: + - 8080:8080 + - 8090:8090 + environment: + BROKER_URL: ${BROKER_URL} + PKI_ADDRESS: http://vault:8200 + no_proxy: vault + NO_PROXY: vault + PRIVKEY_FILE: /run/secrets/dummy.pem + BIND_ADDR: 0.0.0.0:8080 + MONITORING_API_KEY: ${BROKER_MONITORING_KEY} + RUST_LOG: ${RUST_LOG} + ALL_PROXY: http://mitmproxy:8080 + secrets: + - pki.secret + - dummy.pem + - root.crt.pem + mitmproxy: + image: mitmproxy/mitmproxy + stop_signal: SIGKILL + command: mitmweb --web-host 0.0.0.0 --web-port 9090 + ports: + - 9090:9090 + proxy1: + depends_on: [broker] + image: samply/beam-proxy:develop + ports: + - 8081:8081 + environment: + BROKER_URL: ${BROKER_URL} + PROXY_ID: ${PROXY1_ID} + APP_app1_KEY: ${APP_KEY} + APP_app2_KEY: ${APP_KEY} + PRIVKEY_FILE: /run/secrets/proxy1.pem + BIND_ADDR: 0.0.0.0:8081 + RUST_LOG: ${RUST_LOG} + ALL_PROXY: http://mitmproxy:8080 + secrets: + - proxy1.pem + - root.crt.pem + proxy2: + depends_on: [broker] + image: samply/beam-proxy:develop + ports: + - 8082:8082 + environment: + BROKER_URL: ${BROKER_URL} + PROXY_ID: ${PROXY2_ID} + APP_app1_KEY: ${APP_KEY} + APP_app2_KEY: ${APP_KEY} + PRIVKEY_FILE: /run/secrets/proxy2.pem + BIND_ADDR: 0.0.0.0:8082 + RUST_LOG: ${RUST_LOG} + ALL_PROXY: http://mitmproxy:8080 + secrets: + - proxy2.pem + - root.crt.pem + focus: + depends_on: + - proxy1 + - blaze + build: + context: ../ + dockerfile: Dockerfile + image: samply/focus:${TAG} + environment: + API_KEY: ${APP_KEY} + BEAM_APP_ID_LONG: app1.${PROXY1_ID} + BLAZE_URL: "http://blaze:8080/fhir/" + BEAM_PROXY_URL: http://proxy1:8081 + RETRY_COUNT: 30 + OBFUSCATE: "no" + blaze: + image: samply/blaze + volumes: + - "blaze-data:/app/data" +secrets: + pki.secret: + file: ./pki/pki.secret + proxy1.pem: + file: ./pki/${PROXY1_ID_SHORT}.priv.pem + proxy2.pem: + file: ./pki/${PROXY2_ID_SHORT}.priv.pem + dummy.pem: + file: ./pki/dummy.priv.pem + root.crt.pem: + file: ./pki/root.crt.pem + +volumes: + blaze-data: diff --git a/dev/focusdev b/dev/focusdev new file mode 100755 index 0000000..4de2109 --- /dev/null +++ b/dev/focusdev @@ -0,0 +1,137 @@ +#!/bin/bash -e + +# https://stackoverflow.com/questions/59895/ +SOURCE=${BASH_SOURCE[0]} +while [ -L "$SOURCE" ]; do # resolve $SOURCE until the file is no longer a symlink + DIR=$( cd -P "$( dirname "$SOURCE" )" >/dev/null 2>&1 && pwd ) + SOURCE=$(readlink "$SOURCE") + [[ $SOURCE != /* ]] && SOURCE=$DIR/$SOURCE # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located +done +SD=$( cd -P "$( dirname "$SOURCE" )" >/dev/null 2>&1 && pwd ) + +cd $SD + +export PROXY1_ID=${PROXY1_ID:-proxy1.broker} +export PROXY2_ID=${PROXY2_ID:-proxy2.broker} +export PROXY1_ID_SHORT=$(echo $PROXY1_ID | cut -d '.' -f 1) +export PROXY2_ID_SHORT=$(echo $PROXY2_ID | cut -d '.' -f 1) +export BROKER_ID=$(echo $PROXY1_ID | cut -d '.' -f 2-) +export BROKER_URL=http://broker:8080 +export APP1_ID_SHORT=app1 +export APP2_ID_SHORT=app2 +export APP1_P1=${APP1_ID_SHORT}.$PROXY1_ID +export APP2_P1=${APP2_ID_SHORT}.$PROXY1_ID +export APP1_P2=${APP1_ID_SHORT}.$PROXY2_ID +export APP2_P2=${APP2_ID_SHORT}.$PROXY2_ID +export APP_KEY=App1Secret +export BROKER_MONITORING_KEY=SuperSecretKey +export RUST_LOG=${RUST_LOG:-info} + +export VAULT_TOKEN=$(echo "ABCDEFGHIJ" | md5sum | head -c 20; echo;) + +export P1="http://localhost:8081" # for scripts +export P2="http://localhost:8082" # for scripts + +export ARCH=$(docker version --format "{{.Server.Arch}}") + +function image_for_docker() { + # Pick the correct Ubuntu version for the Docker image, + # so the locally-built rust binary works regarding libssl + if [[ "$(pkg-config --modversion libssl)" =~ ^3.* ]]; then + echo "ubuntu:latest" # Use libssl3 + else + echo -n "" # Don't change (uses libssl1.1) + fi +} + +export IMGNAME="$(image_for_docker)" + +function check_prereqs() { + set +e + if [[ "$(curl --version)" != *" libcurl/"* ]]; then + echo "curl not found -- please install curl >= 7.7.0 and put into PATH." + exit 1 + fi + if [[ "$(jq --version)" != *"jq-"* ]]; then + echo "jq not found -- please install and put into PATH." + exit 1 + fi + if [[ "$(rsync --version)" != *"rsync version"* ]]; then + echo "rsync not found -- please install and put into PATH." + exit 1 + fi + set -e +} + +function build() { + BUILD_DOCKER=0 + BACK=$(pwd) + cd $SD/.. + FOCUS=./target/debug/focus + if [ ! -x ./artifacts/binaries-$ARCH ]; then + echo "Binaries missing -- building ..." + BUILD="$(cargo build $@ --message-format=json)" + echo "Will rebuild docker image since binaries had not been there." + mkdir -p artifacts/binaries-$ARCH + rsync "$FOCUS" artifacts/binaries-$ARCH/ + BUILD_DOCKER=1 + elif [ -z "$(docker images -q samply/focus:$TAG)" ]; then + echo "Will rebuild docker image since it is missing." + BUILD_DOCKER=1 + elif [ -x ./target ]; then + echo "Checking for changed Rust source code ..." + BUILD="$(cargo build $@ --message-format=json)" + if echo $BUILD | jq 'select(.fresh==false)' | grep -q 'fresh'; then + echo "Will rebuild docker image due to changes in rust binaries." + rsync "$FOCUS" artifacts/binaries-$ARCH/ + BUILD_DOCKER=1 + fi + fi + if [ $BUILD_DOCKER -eq 1 ]; then + build_docker + else + echo "Not rebuilding docker image since nothing has changed." + fi + cd $BACK +} + +function build_docker() { + BACK2=$(pwd) + cd $SD + if [ -z "$IMGNAME" ]; then + docker-compose build --build-arg TARGETARCH=$ARCH + else + docker-compose build --build-arg TARGETARCH=$ARCH --build-arg IMGNAME=$IMGNAME + fi + cd $BACK2 +} + +function start { + echo "$VAULT_TOKEN" > pki/pki.secret + pki/pki setup + build $@ + docker compose up --no-build --no-recreate -d + docker compose logs focus -f +} + +check_prereqs + +export TAG=${TAG:-localbuild} + +case "$1" in + build) + shift + build $@ + ;; + start) + shift + start $@ + ;; + restart) + docker compose build --build-arg TARGETARCH=$ARCH --build-arg IMGNAME=$IMGNAME focus + docker compose up --no-deps --no-build focus + ;; + *) + echo "Usage: $0 build|start" + ;; +esac diff --git a/dev/pki/pki b/dev/pki/pki new file mode 100755 index 0000000..cd8643f --- /dev/null +++ b/dev/pki/pki @@ -0,0 +1,123 @@ +#!/bin/bash -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +#ME="$(basename "$(test -L "$0" && readlink "$0" || echo "$0")")" + +[ -z "$PROXY1_ID" ] && ( echo "PROXY1_ID not set!"; exit 1) +[ -z "$PROXY2_ID" ] && ( echo "PROXY2_ID not set!"; exit 1) + +cd $SCRIPT_DIR +export PROXY1_ID_SHORT=$(echo $PROXY1_ID | cut -d '.' -f 1) +export PROXY2_ID_SHORT=$(echo $PROXY2_ID | cut -d '.' -f 1) +export BROKER_ID=$(echo $PROXY1_ID | cut -d '.' -f 2-) +export VAULT_ADDR=http://127.0.0.1:8200 + +function start() { + docker-compose up -d --no-build vault +} + +function clean() { + rm -vf *.pem *.json + docker-compose down +} + +function create_root_ca() { + vault secrets enable pki + vault secrets tune -max-lease-ttl=87600h pki + vault write -field=certificate pki/root/generate/internal \ + common_name="Broker-Root" \ + issuer_name="root-2022" \ + ttl=87600h > dktk_root_2022_ca.crt.pem + vault write pki/roles/2022-servers_root allow_any_name=true + cp dktk_root_2022_ca.crt.pem root.crt.pem +} + +function create_intermediate_ca() { + vault secrets enable -path=samply_pki pki + vault secrets tune -max-lease-ttl=43800h samply_pki + vault write -format=json samply_pki/intermediate/generate/internal \ + common_name="$BROKER_ID Intermediate Authority" \ + issuer_name="$BROKER_ID-intermediate" \ + | jq -r '.data.csr' > pki_hd_intermediate.csr.pem + vault write -format=json pki/root/sign-intermediate \ + issuer_ref="root-2022" \ + csr=@pki_hd_intermediate.csr.pem \ + format=pem_bundle ttl="43800h" \ + | jq -r '.data.certificate' > hd_intermediate.crt.pem + vault write samply_pki/intermediate/set-signed certificate=@hd_intermediate.crt.pem + vault write samply_pki/roles/hd-dot-dktk-dot-com \ + issuer_ref="$(vault read -field=default samply_pki/config/issuers)" \ + allowed_domains="$BROKER_ID" \ + allow_subdomains=true \ + allow_glob_domains=true \ + max_ttl="720h" +} + +function request_proxy() { + application="${1:-app1}" + ttl="${2:-24h}" + cn="${application}.$BROKER_ID" + request "$application" "$cn" "$ttl" +} + +function request() { + application=$1 + cn=$2 + ttl=$3 + data="{\"common_name\": \"$cn\", \"ttl\": \"$ttl\"}" + echo $data + echo "Creating Certificate for domain $cn" + curl --header "X-Vault-Token: $VAULT_TOKEN" \ + --request POST \ + --data "$data" \ + --no-progress-meter \ + $VAULT_ADDR/v1/samply_pki/issue/hd-dot-dktk-dot-com | jq > ${application}.json + cat ${application}.json | jq -r .data.certificate > ${application}.crt.pem + cat ${application}.json | jq -r .data.ca_chain[] > ${application}.chain.pem + cat ${application}.json | jq -r .data.private_key > ${application}.priv.pem + echo "Success: PEM files stored to ${application}*.pem" +} + +function setup() { + clean + touch ${PROXY1_ID_SHORT}.priv.pem # see https://github.com/docker/compose/issues/8305 + touch ${PROXY2_ID_SHORT}.priv.pem # see https://github.com/docker/compose/issues/8305 + # touch test0.priv.pem # see https://github.com/docker/compose/issues/8305 + # touch test1.priv.pem # see https://github.com/docker/compose/issues/8305 + # touch root.crt.pem # see https://github.com/docker/compose/issues/8304 + start + while ! [ "$(curl -s $VAULT_ADDR/v1/sys/health | jq -r .sealed)" == "false" ]; do echo "Waiting ..."; sleep 0.1; done + docker compose exec -T vault sh -c "https_proxy=$http_proxy apk add --no-cache bash curl jq" + docker compose exec -T vault sh -c "VAULT_TOKEN=$VAULT_TOKEN http_proxy= HTTP_PROXY= PROXY1_ID=$PROXY1_ID PROXY2_ID=$PROXY2_ID /pki/pki init" + docker compose exec -T vault sh -c "VAULT_TOKEN=$VAULT_TOKEN http_proxy= HTTP_PROXY= PROXY1_ID=$PROXY1_ID PROXY2_ID=$PROXY2_ID /pki/pki request_proxy $PROXY1_ID_SHORT" "1y" + docker compose exec -T vault sh -c "VAULT_TOKEN=$VAULT_TOKEN http_proxy= HTTP_PROXY= PROXY1_ID=$PROXY1_ID PROXY2_ID=$PROXY2_ID /pki/pki request_proxy $PROXY2_ID_SHORT" "1y" + docker compose exec -T vault sh -c "VAULT_TOKEN=$VAULT_TOKEN http_proxy= HTTP_PROXY= PROXY1_ID=$PROXY1_ID PROXY2_ID=$PROXY2_ID /pki/pki request_proxy dummy" "1y" +} + +function init() { + echo "Creating Root CA" + create_root_ca + + echo "Creating Intermediate HD CA" + create_intermediate_ca + + echo "Successfully completed 'init'." +} + +case "$1" in + clean) + clean + ;; + setup) + setup + ;; + init) + init + ;; + request_proxy) + request_proxy $2 $3 + ;; + *) + echo "Usage: $0 setup|clean|init" + ;; +esac diff --git a/src/main.rs b/src/main.rs index 21873f3..e8e1063 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,6 +59,11 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { + warn!("asdfasdfafd"); + warn!("asdfasdfafd"); + warn!("asdfasdfafd"); + warn!("asdfasdfafd"); + warn!("asdfasdfafd"); let mut obf_cache: ObfCache = ObfCache { cache: HashMap::new(), }; From 2dab0629d40eb8382e9f1f94513dc82138a39105 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 07:46:52 +0000 Subject: [PATCH 04/22] Rebuild fsource on `focusdev restart` if needed --- dev/focusdev | 5 +++-- dev/pki/pki | 3 --- src/main.rs | 5 ----- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/dev/focusdev b/dev/focusdev index 4de2109..4e8ed23 100755 --- a/dev/focusdev +++ b/dev/focusdev @@ -128,10 +128,11 @@ case "$1" in start $@ ;; restart) - docker compose build --build-arg TARGETARCH=$ARCH --build-arg IMGNAME=$IMGNAME focus + shift + build $@ docker compose up --no-deps --no-build focus ;; *) - echo "Usage: $0 build|start" + echo "Usage: $0 build|start|restart" ;; esac diff --git a/dev/pki/pki b/dev/pki/pki index cd8643f..da98242 100755 --- a/dev/pki/pki +++ b/dev/pki/pki @@ -82,9 +82,6 @@ function setup() { clean touch ${PROXY1_ID_SHORT}.priv.pem # see https://github.com/docker/compose/issues/8305 touch ${PROXY2_ID_SHORT}.priv.pem # see https://github.com/docker/compose/issues/8305 - # touch test0.priv.pem # see https://github.com/docker/compose/issues/8305 - # touch test1.priv.pem # see https://github.com/docker/compose/issues/8305 - # touch root.crt.pem # see https://github.com/docker/compose/issues/8304 start while ! [ "$(curl -s $VAULT_ADDR/v1/sys/health | jq -r .sealed)" == "false" ]; do echo "Waiting ..."; sleep 0.1; done docker compose exec -T vault sh -c "https_proxy=$http_proxy apk add --no-cache bash curl jq" diff --git a/src/main.rs b/src/main.rs index e8e1063..21873f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,11 +59,6 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { - warn!("asdfasdfafd"); - warn!("asdfasdfafd"); - warn!("asdfasdfafd"); - warn!("asdfasdfafd"); - warn!("asdfasdfafd"); let mut obf_cache: ObfCache = ObfCache { cache: HashMap::new(), }; From 9ee0ee760f1c4a7fac91a0aa359873af0f11c675 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 08:39:58 +0000 Subject: [PATCH 05/22] Upgrade distoless to debian 12 This allows droping the extra logic to link against a diffrent libssl version --- Dockerfile | 2 +- dev/focusdev | 20 ++------------------ 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Dockerfile b/Dockerfile index e84eb22..db66d8e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # This assumes binaries are present, see COPY directive. -ARG IMGNAME=gcr.io/distroless/cc +ARG IMGNAME=gcr.io/distroless/cc-debian12 FROM alpine AS chmodder ARG TARGETARCH diff --git a/dev/focusdev b/dev/focusdev index 4e8ed23..3cdb756 100755 --- a/dev/focusdev +++ b/dev/focusdev @@ -34,18 +34,6 @@ export P2="http://localhost:8082" # for scripts export ARCH=$(docker version --format "{{.Server.Arch}}") -function image_for_docker() { - # Pick the correct Ubuntu version for the Docker image, - # so the locally-built rust binary works regarding libssl - if [[ "$(pkg-config --modversion libssl)" =~ ^3.* ]]; then - echo "ubuntu:latest" # Use libssl3 - else - echo -n "" # Don't change (uses libssl1.1) - fi -} - -export IMGNAME="$(image_for_docker)" - function check_prereqs() { set +e if [[ "$(curl --version)" != *" libcurl/"* ]]; then @@ -67,7 +55,7 @@ function build() { BUILD_DOCKER=0 BACK=$(pwd) cd $SD/.. - FOCUS=./target/debug/focus + FOCUS=./target/release/focus if [ ! -x ./artifacts/binaries-$ARCH ]; then echo "Binaries missing -- building ..." BUILD="$(cargo build $@ --message-format=json)" @@ -98,11 +86,7 @@ function build() { function build_docker() { BACK2=$(pwd) cd $SD - if [ -z "$IMGNAME" ]; then - docker-compose build --build-arg TARGETARCH=$ARCH - else - docker-compose build --build-arg TARGETARCH=$ARCH --build-arg IMGNAME=$IMGNAME - fi + docker-compose build --build-arg TARGETARCH=$ARCH cd $BACK2 } From c37d8c3303cb0ced5bcf08e03e6f6560eea784cf Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 08:42:42 +0000 Subject: [PATCH 06/22] Update CI images to link against libssl 3 --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f6bf92f..75e3a66 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -22,7 +22,7 @@ jobs: build-rust: name: Build (Rust) - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest strategy: matrix: @@ -86,7 +86,7 @@ jobs: test: name: Run tests needs: [ build-rust ] - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 From 295ec33faeda291814ebc3edbcc62fa5656f96e8 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 12:48:04 +0000 Subject: [PATCH 07/22] Use fix version to build rust --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 75e3a66..b27b78c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -22,7 +22,7 @@ jobs: build-rust: name: Build (Rust) - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: @@ -86,7 +86,7 @@ jobs: test: name: Run tests needs: [ build-rust ] - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 From ffd285f93971466198fd398b6774b70d64ec717e Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 13:10:16 +0000 Subject: [PATCH 08/22] Add `focusdev stop` --- dev/focusdev | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dev/focusdev b/dev/focusdev index 3cdb756..6b88851 100755 --- a/dev/focusdev +++ b/dev/focusdev @@ -116,7 +116,10 @@ case "$1" in build $@ docker compose up --no-deps --no-build focus ;; + stop) + docker compose down + ;; *) - echo "Usage: $0 build|start|restart" + echo "Usage: $0 build|start|restart|stop" ;; esac From 2c73c9da91968703d60d51882c6198798e710e37 Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 20 Sep 2023 13:14:59 +0000 Subject: [PATCH 09/22] Remove `IMGNAME` build arg --- Dockerfile | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index db66d8e..dc15b04 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,10 @@ # This assumes binaries are present, see COPY directive. -ARG IMGNAME=gcr.io/distroless/cc-debian12 - FROM alpine AS chmodder ARG TARGETARCH COPY /artifacts/binaries-$TARGETARCH/focus /app/ RUN chmod +x /app/* -FROM ${IMGNAME} +FROM gcr.io/distroless/cc-debian12 COPY --from=chmodder /app/* /usr/local/bin/ ENTRYPOINT [ "/usr/local/bin/focus" ] From f7638c76d115c6535cd238690498092b1e1d5aeb Mon Sep 17 00:00:00 2001 From: lablans Date: Mon, 25 Sep 2023 12:39:01 +0000 Subject: [PATCH 10/22] Fix compiler errors --- src/beam.rs | 10 +++++----- src/main.rs | 19 ++++++++----------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 2e1bfc9..e9eda1a 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -141,25 +141,25 @@ impl BeamResult { body: None, } } - pub fn succeeded(from: AppId, to: Vec, task: Uuid, body: String) -> Self { + pub fn succeeded(from: AppId, to: Vec, task: Uuid, body: Option) -> Self { Self { from, to, task, status: Status::Succeeded, metadata: None, - body: Some(body), + body, } } - pub fn perm_failed(from: AppId, to: Vec, task: Uuid, body: String) -> Self { + pub fn perm_failed(from: AppId, to: Vec, task: Uuid, body: Option) -> Self { Self { from, to, task, status: Status::PermFailed, metadata: None, - body: Some(body), + body, } } } @@ -279,7 +279,7 @@ pub async fn answer_task(task: &BeamTask, result: &BeamResult) -> Result<(), Foc pub async fn fail_task(task: &BeamTask, body: impl Into) -> Result<(), FocusError> { let body = body.into(); warn!("Reporting failed task with id {}: {}", task.id, body); - let result = BeamResult::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body); + let result = BeamResult::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, Some(body)); let url = format!( "{}v1/tasks/{}/results/{}", CONFIG.beam_proxy_url, task.id, CONFIG.beam_app_id_long diff --git a/src/main.rs b/src/main.rs index 21873f3..8004aad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -228,7 +228,7 @@ async fn run_query( CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, - format!("Can't run inqueries with language {}", query.lang), + Some(format!("Can't run inquiries with language {}", query.lang)), )); } } @@ -239,13 +239,6 @@ async fn run_cql_query( obf_cache: &mut ObfCache, report_cache: &mut ReportCache, ) -> Result { - let mut err = beam::BeamResult::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.to_owned().from], - task.to_owned().id, - String::new(), - ); - let encoded_query = query.lib["content"][0]["data"] .as_str() @@ -302,8 +295,12 @@ async fn run_cql_query( }; let result = beam_result(task.to_owned(), cql_result_new).unwrap_or_else(|e| { - err.body = e.to_string(); - return err; + beam::BeamResult::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.to_owned().from], + task.to_owned().id, + Some(e.to_string()) + ) }); Ok(result) @@ -356,6 +353,6 @@ fn beam_result( CONFIG.beam_app_id_long.clone(), vec![task.from], task.id, - data, + Some(data) )); } From 72ace792802b4c95045aed1fddfa429c4de1eb3d Mon Sep 17 00:00:00 2001 From: lablans Date: Mon, 25 Sep 2023 12:46:02 +0000 Subject: [PATCH 11/22] Reformat query.lang --- src/main.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main.rs b/src/main.rs index 21873f3..173ea72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -220,16 +220,19 @@ async fn run_query( ) -> Result { debug!("Run"); - if query.lang == "cql" { - // TODO: Change query.lang to an enum - return Ok(run_cql_query(task, query, obf_cache, report_cache).await)?; - } else { - return Ok(beam::BeamResult::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!("Can't run inqueries with language {}", query.lang), - )); + match query.lang.as_str() { + "cql" => { + // TODO: Change query.lang to an enum + Ok(run_cql_query(task, query, obf_cache, report_cache).await)? + }, + unsupported_lang => { + Ok(beam::BeamResult::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!("Can't run queries with language {unsupported_lang}"), + )) + } } } From d322b1bb10428adbc72796fb0a0685e538c3dcff Mon Sep 17 00:00:00 2001 From: lablans Date: Mon, 25 Sep 2023 12:51:49 +0000 Subject: [PATCH 12/22] Allow to build docker image manually for branches --- .github/workflows/rust.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b27b78c..661954d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,6 +2,7 @@ name: Build with rust and docker on: push: + workflow_dispatch: pull_request: schedule: # Fetch new base image updates every night at 1am @@ -98,7 +99,7 @@ jobs: docker-focus: needs: [ build-rust, pre-check, test ] - if: github.ref_protected == true + if: github.ref_protected == true || github.event_name == 'workflow_dispatch' # This workflow defines how a maven package is built, tested and published. # Visit: https://github.com/samply/github-workflows/blob/develop/.github/workflows/docker-ci.yml, for more information From 51d1aa78fdae66cdf3bad77ac82f397be87f07d6 Mon Sep 17 00:00:00 2001 From: lablans Date: Mon, 25 Sep 2023 12:52:53 +0000 Subject: [PATCH 13/22] Be explicit about Ubuntu image usage --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 661954d..6acbc50 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,7 +15,7 @@ env: jobs: pre-check: name: Security, License Check - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 From 1078f5c7849610129fa1b1615e5d52f0e9a13a98 Mon Sep 17 00:00:00 2001 From: lablans Date: Mon, 25 Sep 2023 12:58:44 +0000 Subject: [PATCH 14/22] Build and run tests faster --- .github/workflows/rust.yml | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 6acbc50..bebe260 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -71,18 +71,34 @@ jobs: with: key: ${{ matrix.arch }}-${{ env.PROFILE }} prefix-key: "v1-rust" # Increase to invalidate old caches. - - name: Build (${{ matrix.arch }}) + - name: Build (cross to ${{ matrix.arch }}) + if: env.is_cross == 'true' uses: actions-rs/cargo@v1 with: use-cross: ${{ env.is_cross }} command: build - args: --target ${{ env.rustarch }} ${{ env.profilestr }} - - name: Upload Artifact + args: --target ${{ env.rustarch }} ${{ matrix.features && format('--features {0}', matrix.features) }} ${{ env.profilestr }} + - name: Build (native) + if: env.is_cross == 'false' + run: | + BINS=$(cargo build --tests --bins --message-format=json --target ${{ env.rustarch }} ${{ matrix.features && format('--features {0}', matrix.features) }} ${{ env.profilestr }} | jq -r 'select(.profile.test == true) | .executable | select(. != null)') + mkdir -p testbinaries/ + for testbin in $BINS; do + mv -v $testbin testbinaries/ + done + - name: Upload (bins) uses: actions/upload-artifact@v3 with: name: binaries-${{ matrix.arch }} path: | target/${{ env.rustarch }}/${{ env.PROFILE }}/focus + - name: Upload (test, native only) + if: matrix.arch == 'amd64' + uses: actions/upload-artifact@v3 + with: + name: testbinaries-${{ matrix.arch }} + path: | + testbinaries/* test: name: Run tests @@ -93,9 +109,13 @@ jobs: - uses: actions/checkout@v3 - uses: actions/download-artifact@v3 with: - name: binaries-amd64 - path: artifacts/binaries-amd64/ - - run: cargo test + name: testbinaries-amd64 + path: testbinaries/ + - run: | + for testbin in testbinaries/*; do + chmod +x $testbin + $testbin + done docker-focus: needs: [ build-rust, pre-check, test ] From 095ba851a4063f66117e3a08ecc7905784463526 Mon Sep 17 00:00:00 2001 From: lablans Date: Wed, 27 Sep 2023 08:34:40 +0000 Subject: [PATCH 15/22] Add build hint to Dockerfile --- Dockerfile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index dc15b04..24f078a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,10 @@ -# This assumes binaries are present, see COPY directive. +# This Dockerfile is infused with magic to speedup the build. +# In particular, it requires built binaries to be present (see COPY directive). +# +# tl;dr: To make this build work, run +# ./dev/focusdev build +# and find your freshly built images tagged with the `localbuild` tag. + FROM alpine AS chmodder ARG TARGETARCH COPY /artifacts/binaries-$TARGETARCH/focus /app/ From 993a0bdcf256afc220715e92eb48f32c706734b3 Mon Sep 17 00:00:00 2001 From: lablans Date: Wed, 27 Sep 2023 08:35:05 +0000 Subject: [PATCH 16/22] Add build hint to Dockerfile --- Dockerfile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index dc15b04..24f078a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,10 @@ -# This assumes binaries are present, see COPY directive. +# This Dockerfile is infused with magic to speedup the build. +# In particular, it requires built binaries to be present (see COPY directive). +# +# tl;dr: To make this build work, run +# ./dev/focusdev build +# and find your freshly built images tagged with the `localbuild` tag. + FROM alpine AS chmodder ARG TARGETARCH COPY /artifacts/binaries-$TARGETARCH/focus /app/ From 223acf671b76e4db0752478329768d55a1c3bcf6 Mon Sep 17 00:00:00 2001 From: Torben Brenner Date: Tue, 25 Jul 2023 17:51:52 +0200 Subject: [PATCH 17/22] fix: search for age at primary diagnosis --- src/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.rs b/src/util.rs index 3ce4574..5996a7d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -103,7 +103,7 @@ pub(crate) fn replace_cql(decoded_library: impl Into) -> String { ("EXLIQUID_CQL", "define ExliquidSpecimen:\n from [Specimen] S\n where S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen'\ndefine retrieveCondition:\n First(from [Condition] C\n return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first())\ndefine Diagnosis:\n if (retrieveCondition is null) then 'unknown' else retrieveCondition\ndefine function SampleType(specimen FHIR.Specimen):\n specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), ("EXLIQUID_ALIQUOTS_CQL", "define Aliquot: [Specimen] S where exists S.collection.quantity.value and exists S.parent.reference and S.container.specimenQuantity.value > 0 define AliquotGroupReferences: flatten Aliquot S return S.parent.reference define AliquotGroupWithAliquot: [Specimen] S where not (S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen') and not exists S.collection.quantity.value and not exists S.container.specimenQuantity.value and AliquotGroupReferences contains 'Specimen/' + S.id define PrimarySampleReferences: flatten AliquotGroupWithAliquot S return S.parent.reference define ExliquidSpecimenWithAliquot: from [Specimen] PrimarySample where PrimarySample.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen' and PrimarySampleReferences contains 'Specimen/' + PrimarySample.id define retrieveCondition: First(from [Condition] C return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()) define Diagnosis: if (retrieveCondition is null) then 'unknown' else retrieveCondition define function SampleType(specimen FHIR.Specimen): specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), ("DKTK_STRAT_GENDER_STRATIFIER", "define Gender:\nif (Patient.gender is null) then 'unknown' else Patient.gender"), - ("DKTK_STRAT_AGE_STRATIFIER", "define AgeClass:\nif (Patient.birthDate is null) then 'unknown' else ToString((AgeInYears() div 10) * 10)"), + ("DKTK_STRAT_AGE_STRATIFIER", "define PrimaryDiagnosis:\nFirst(\nfrom [Condition] C\nwhere C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty()\nsort by date from onset asc)\n\ndefine AgeClass:\nif (PrimaryDiagnosis.onset is null) then 'unknown' else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10)"), ("DKTK_STRAT_DECEASED_STRATIFIER", "define PatientDeceased:\nFirst (from [Observation: Code '75186-7' from loinc] O return O.value.coding.where(system = 'http://dktk.dkfz.de/fhir/onco/core/CodeSystem/VitalstatusCS').code.first())\ndefine Deceased:\nif (PatientDeceased is null) then 'unbekannt' else PatientDeceased"), ("DKTK_STRAT_DIAGNOSIS_STRATIFIER", "define Diagnosis:\nif InInitialPopulation then [Condition] else {} as List\n\ndefine function DiagnosisCode(condition FHIR.Condition):\ncondition.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()"), ("DKTK_STRAT_SPECIMEN_STRATIFIER", "define Specimen:\nif InInitialPopulation then [Specimen] else {} as List\n\ndefine function SampleType(specimen FHIR.Specimen):\nspecimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), From fc510bc16eb7af61c1cc13751ec5e6c02f27782b Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Thu, 28 Sep 2023 12:52:25 +0200 Subject: [PATCH 18/22] Use beam-lib for beam interactions (#44) * Use `beam_lib` for beam interactions --------- Co-authored-by: janskiba Co-authored-by: Tobias Kussel --- Cargo.toml | 8 +- src/beam.rs | 360 ++++++--------------------------------- src/blaze.rs | 15 +- src/config.rs | 14 +- src/errors.rs | 10 +- src/graceful_shutdown.rs | 4 +- src/main.rs | 48 +++--- src/util.rs | 2 +- 8 files changed, 97 insertions(+), 364 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 53ebfe1..0b9e32f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,12 @@ license = "Apache-2.0" [dependencies] base64 = { version = "0.21.0", default_features = false } http = "0.2" -reqwest = { version = "0.11.14", default_features = false, features = ["json", "default-tls"] } +reqwest = { version = "0.11", default_features = false, features = ["json", "default-tls"] } serde = { version = "1.0.152", features = ["serde_derive"] } -serde_json = "1.0.91" +serde_json = "1.0" thiserror = "1.0.38" tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] } -uuid = { version = "1.3.0", default_features = false, features = ["serde"]} +beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] } laplace_rs = {version = "0.2.0", git = "https://github.com/samply/laplace-rs.git", branch = "main" } # Logging @@ -22,7 +22,7 @@ tracing = { version = "0.1.37", default_features = false } tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "fmt"] } # Global variables -static_init = "1.0.2" +once_cell = "1.18" # Command Line Interface clap = { version = "4.0", default_features = false, features = ["std", "env", "derive"] } diff --git a/src/beam.rs b/src/beam.rs index 2bb0d26..4558dca 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,164 +1,45 @@ -use std::{fmt::Display}; +use std::time::Duration; -use http::{HeaderValue, StatusCode}; -use reqwest::header::{HeaderMap, AUTHORIZATION}; -use serde::{de, Deserializer, Deserialize, Serialize, Serializer}; -use tracing::{debug, warn, info}; -use uuid::Uuid; +use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest}; +use once_cell::sync::Lazy; +use serde::{de::DeserializeOwned, Serialize}; +use tracing::{debug, warn}; use crate::{config::CONFIG, errors::FocusError}; -type BrokerId = String; +pub mod beam_result { + use beam_lib::{AppId, TaskResult, WorkStatus, MsgId}; + use serde::Serialize; + use serde_json::Value; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct ProxyId { - proxy: String, - broker: BrokerId, -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct AppId { - app: String, - rest: ProxyId, -} - -impl ProxyId { - pub fn get_proxy_id(&self) -> String { - format!("{}.{}", &self.proxy, &self.broker) - } - pub fn get_broker_id(&self) -> String { - self.broker.clone() - } - pub fn new(full: String) -> Result { - let mut components: Vec = full.split(".").map(|x| x.to_string()).collect(); - let rest = components.split_off(1).join("."); - Ok(ProxyId { - proxy: components - .first() - .cloned() - .ok_or_else(|| FocusError::InvalidBeamId(format!("Invalid ProxyId: {}", full)))?, - broker: rest, - }) - } -} - -impl AppId { - pub fn get_app_id(&self) -> String { - format!("{}.{}", &self.app, &self.rest.get_proxy_id()) - } - pub fn get_proxy_id(&self) -> String { - self.rest.get_proxy_id() - } - pub fn new(full: String) -> Result { - let mut components: Vec = full.split(".").map(|x| x.to_string()).collect(); - let rest = components.split_off(1).join("."); - Ok(AppId { - app: components - .first() - .cloned() - .ok_or_else(|| FocusError::InvalidBeamId(format!("Invalid ProxyId: {}", full)))?, - rest: ProxyId::new(rest)?, - }) - } -} -impl Display for ProxyId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}.{}", self.proxy, self.broker) - } -} -impl Display for AppId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}.{}", self.app, self.rest) - } -} - -impl<'de> serde::Deserialize<'de> for AppId { - fn deserialize>(d: D) -> Result { - let s = String::deserialize(d)?; - AppId::new(s).map_err(de::Error::custom) - } -} - -impl Serialize for AppId { - fn serialize(&self, serializer: S) -> Result - where S: Serializer, - { - let mut state = String::serialize(&self.to_string(), serializer)?; - Ok(state) - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct BeamTask { - pub id: Uuid, - pub from: AppId, - pub to: Vec, - pub metadata: String, - pub body: String, - pub ttl: String, - pub failure_strategy: FailureStrategy, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "lowercase")] -pub enum FailureStrategy { - Retry(Retry), -} - -#[derive(Serialize, Deserialize, Debug, Default, Clone)] -pub struct Retry { - pub backoff_millisecs: usize, - pub max_tries: usize, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct BeamResult { - pub from: AppId, - pub to: Vec, - pub task: Uuid, - pub status: Status, - pub metadata: Option, - pub body: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum Status { - Claimed, - Succeeded, - TempFailed, - PermFailed, -} - -impl BeamResult { - pub fn claimed(from: AppId, to: Vec, task: Uuid) -> Self { - Self { + pub fn claimed(from: AppId, to: Vec, task_id: MsgId) -> TaskResult<()> { + TaskResult { from, to, - task, - status: Status::Claimed, - metadata: None, - body: None, + task: task_id, + status: WorkStatus::Claimed, + metadata: Value::Null, + body: (), } } - pub fn succeeded(from: AppId, to: Vec, task: Uuid, body: Option) -> Self { - Self { + pub fn succeeded(from: AppId, to: Vec, task_id: MsgId, body: T) -> TaskResult { + TaskResult { from, to, - task, - status: Status::Succeeded, - metadata: None, + task: task_id, + status: WorkStatus::Succeeded, + metadata: Value::Null, body, } } - pub fn perm_failed(from: AppId, to: Vec, task: Uuid, body: Option) -> Self { - Self { + pub fn perm_failed(from: AppId, to: Vec, task_id: MsgId, body: T) -> TaskResult { + TaskResult { from, to, - task, - status: Status::PermFailed, - metadata: None, + task: task_id, + status: WorkStatus::PermFailed, + metadata: Value::Null, body, } } @@ -186,185 +67,42 @@ pub async fn check_availability() -> bool { false } -pub async fn retrieve_tasks() -> Result, FocusError> { - debug!("Retrieving tasks..."); - - let mut headers = HeaderMap::new(); - headers.insert( - AUTHORIZATION, - HeaderValue::from_str(&format!("ApiKey {} {}", CONFIG.beam_app_id_long, CONFIG.api_key)) - .map_err(|e| { - FocusError::ConfigurationError(format!( - "Cannot assemble authorization header: {}", - e - )) - })?, - ); - - let url = format!( - "{}v1/tasks?filter=todo&wait_count=1&wait_time=10s", - CONFIG.beam_proxy_url - ); - let resp = CONFIG.client - .get(&url) - .headers(headers) - .send() - .await - .map_err(|e| FocusError::UnableToRetrieveTasksHttp(e))?; +static BEAM_CLIENT: Lazy = Lazy::new(|| BeamClient::new( + &CONFIG.beam_app_id_long, + &CONFIG.api_key, + CONFIG.beam_proxy_url.to_string().parse().expect("Uri always converts to url") +)); - let tasks = match resp.status() { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - resp - .json::>() - .await - .map_err(|e| FocusError::UnableToParseTasks(e))? - } - code => { - return Err(FocusError::UnableToRetrieveTasksOther(format!("Got status code {code}"))); - } +pub async fn retrieve_tasks() -> Result>, FocusError> { + debug!("Retrieving tasks..."); + let block = BlockingOptions { + wait_time: Some(Duration::from_secs(10)), + wait_count: Some(1) }; - Ok(tasks) + BEAM_CLIENT.poll_pending_tasks(&block) + .await + .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task: &BeamTask, result: &BeamResult) -> Result<(), FocusError> { - let task_id = task.id.to_string(); +pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { debug!("Answer task with id: {task_id}"); - let result_task = result.task; - let url = format!( - "{}v1/tasks/{}/results/{}", - CONFIG.beam_proxy_url, &result_task, CONFIG.beam_app_id_long - ); - - let mut headers = HeaderMap::new(); - headers.insert( - AUTHORIZATION, - HeaderValue::from_str(&format!("ApiKey {} {}", CONFIG.beam_app_id_long, CONFIG.api_key)) - .map_err(|e| { - FocusError::ConfigurationError(format!( - "Cannot assemble authorization header: {}", - e - )) - })?, - ); - - let resp = CONFIG.client - .put(&url) - .headers(headers) - .json(&result) - .send() + BEAM_CLIENT.put_result(result, &task_id) .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - - let status_code = resp.status(); - let status_text = status_code.as_str(); - debug!("{status_text}"); - - match status_code { - StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()), - StatusCode::BAD_REQUEST => { - let msg = resp - .text() - .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - warn!("Error while answering the task with id: {msg}"); - Ok(()) // return error - } - _ => { - warn!("Unexpected status code: {}", resp.status()); - Ok(()) //return error - } - } + .map_err(FocusError::UnableToAnswerTask) } -pub async fn fail_task(task: &BeamTask, body: impl Into) -> Result<(), FocusError> { +pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result { let body = body.into(); warn!("Reporting failed task with id {}: {}", task.id, body); - let result = BeamResult::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, Some(body)); - let url = format!( - "{}v1/tasks/{}/results/{}", - CONFIG.beam_proxy_url, task.id, CONFIG.beam_app_id_long - ); - - let mut headers = HeaderMap::new(); - headers.insert( - AUTHORIZATION, - HeaderValue::from_str(&format!("ApiKey {} {}", CONFIG.beam_app_id_long, CONFIG.api_key)) - .map_err(|e| { - FocusError::ConfigurationError(format!( - "Cannot assemble authorization header: {}", - e - )) - })?, - ); - - let resp = CONFIG.client - .put(&url) - .headers(headers) - .json(&result) - .send() + let result = beam_result::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body); + BEAM_CLIENT.put_result(&result, &task.id) .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - - match resp.status() { - StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()), - StatusCode::BAD_REQUEST => { - let msg = resp - .text() - .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - warn!("Error while reporting the failed task with id {}: {msg}", task.id); - Ok(()) // return error - } - _ => { - warn!("Unexpected status code: {}", resp.status()); - Ok(()) //return error - } - } + .map_err(FocusError::UnableToAnswerTask) } -pub async fn claim_task(task: &BeamTask) -> Result<(), FocusError> { - let result = BeamResult::claimed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id); - let url = format!( - "{}v1/tasks/{}/results/{}", - CONFIG.beam_proxy_url, task.id, CONFIG.beam_app_id_long - ); - - let mut headers = HeaderMap::new(); - headers.insert( - AUTHORIZATION, - HeaderValue::from_str(&format!("ApiKey {} {}", CONFIG.beam_app_id_long, CONFIG.api_key)) - .map_err(|e| { - FocusError::ConfigurationError(format!( - "Cannot assemble authorization header: {}", - e - )) - })?, - ); - - let resp = CONFIG.client - .put(&url) - .headers(headers) - .json(&result) - .send() +pub async fn claim_task(task: &TaskRequest) -> Result { + let result = beam_result::claimed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id); + BEAM_CLIENT.put_result(&result, &task.id) .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - - match resp.status() { - StatusCode::CREATED | StatusCode::NO_CONTENT => { - info!("Task {} claimed", task.id); - Ok(()) - }, - StatusCode::BAD_REQUEST => { - let msg = resp - .text() - .await - .map_err(|e| FocusError::UnableToAnswerTask(e))?; - warn!("Error while reporting the claimed task with id {}: {msg}", task.id); - Ok(()) // return error - } - _ => { - warn!("Unexpected status code: {}", resp.status()); - Ok(()) //return error - } - } + .map_err(FocusError::UnableToAnswerTask) } diff --git a/src/blaze.rs b/src/blaze.rs index 9785609..72a69fd 100644 --- a/src/blaze.rs +++ b/src/blaze.rs @@ -80,12 +80,11 @@ pub async fn post_measure(measure: String) -> Result<(), FocusError> { pub async fn evaluate_measure(url: String) -> Result { debug!("Evaluating the Measure with canonical URL: {}", url); - let mut text: String = String::new(); let resp = CONFIG.client .get(format!( - "{}Measure/$evaluate-measure?measure={}&periodStart=2000&periodEnd=2030", - CONFIG.blaze_url, - url + "{}Measure/$evaluate-measure?measure={}&periodStart=2000&periodEnd=2030", + CONFIG.blaze_url, + url )) .send() .await @@ -96,19 +95,17 @@ pub async fn evaluate_measure(url: String) -> Result { "Successfully evaluated the Measure with canonical URL: {}", url ); - text = resp + resp .text() .await - .map_err(|e| FocusError::MeasureEvaluationErrorReqwest(e))?; + .map_err(|e| FocusError::MeasureEvaluationErrorReqwest(e)) } else { warn!( "Error while evaluating the Measure with canonical URL `{}`: {:?}", url, resp ); - return Err(FocusError::MeasureEvaluationErrorBlaze(format!( "Error while evaluating the Measure with canonical URL `{}`: {:?}", url, resp))); + Err(FocusError::MeasureEvaluationErrorBlaze(format!( "Error while evaluating the Measure with canonical URL `{}`: {:?}", url, resp))) } - - Ok(text) } pub async fn run_cql_query(library: &Value, measure: &Value) -> Result { diff --git a/src/config.rs b/src/config.rs index a775391..85f7c58 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,12 +1,14 @@ use std::path::PathBuf; +use beam_lib::AppId; use clap::Parser; use http::{HeaderValue, Uri}; +use once_cell::sync::Lazy; use reqwest::{Certificate, Client, Proxy}; -use static_init::dynamic; use tracing::{debug, info, warn}; -use crate::{beam::AppId, errors::FocusError}; +use crate::errors::FocusError; + #[derive(clap::ValueEnum, Clone, Debug)] pub enum Obfuscate { @@ -14,15 +16,13 @@ pub enum Obfuscate { Yes, } - -#[dynamic(lazy)] -pub(crate) static CONFIG: Config = { +pub(crate) static CONFIG: Lazy = Lazy::new(|| { debug!("Loading config"); Config::load().unwrap_or_else(|e| { eprintln!("Unable to start as there was an error reading the config:\n{}\n\nTerminating -- please double-check your startup parameters with --help and refer to the documentation.", e); std::process::exit(1); }) -}; +}); const CLAP_FOOTER: &str = "For proxy support, environment variables HTTP_PROXY, HTTPS_PROXY, ALL_PROXY and NO_PROXY (and their lower-case variants) are supported. Usually, you want to set HTTP_PROXY *and* HTTPS_PROXY or set ALL_PROXY if both values are the same.\n\nFor updates and detailed usage instructions, visit https://github.com/samply/focus"; @@ -129,7 +129,7 @@ impl Config { let client = prepare_reqwest_client(&tls_ca_certificates)?; let config = Config { beam_proxy_url: cli_args.beam_proxy_url, - beam_app_id_long: AppId::new(cli_args.beam_app_id_long)?, + beam_app_id_long: AppId::new_unchecked(cli_args.beam_app_id_long), api_key: cli_args.api_key, retry_count: cli_args.retry_count, blaze_url: cli_args.blaze_url, diff --git a/src/errors.rs b/src/errors.rs index 26bfb5a..6bd6757 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -13,13 +13,9 @@ pub enum FocusError { #[error("CQL query error")] CQLQueryError(), #[error("Unable to retrieve tasks from Beam: {0}")] - UnableToRetrieveTasksHttp(reqwest::Error), - #[error("Unable to retrieve tasks from Beam: {0}")] - UnableToRetrieveTasksOther(String), - #[error("Unable to parse tasks from Beam")] - UnableToParseTasks(reqwest::Error), + UnableToRetrieveTasksHttp(beam_lib::BeamError), #[error("Unable to answer task")] - UnableToAnswerTask(reqwest::Error), + UnableToAnswerTask(beam_lib::BeamError), #[error("Unable to set proxy settings")] InvalidProxyConfig(reqwest::Error), #[error("Decode error")] @@ -28,8 +24,6 @@ pub enum FocusError { ConfigurationError(String), #[error("Cannot open file")] FileOpeningError(String), - #[error("Invalid BeamID")] - InvalidBeamId(String), #[error("Parsing error")] ParsingError(String), #[error("CQL tempered with")] diff --git a/src/graceful_shutdown.rs b/src/graceful_shutdown.rs index 8775a38..18cc534 100644 --- a/src/graceful_shutdown.rs +++ b/src/graceful_shutdown.rs @@ -1,10 +1,8 @@ -use std::time::Duration; - -use tokio::signal::unix::{signal,SignalKind}; use tracing::info; #[cfg(unix)] pub async fn wait_for_signal() { + use tokio::signal::unix::{signal,SignalKind}; let mut sigterm = signal(SignalKind::terminate()) .expect("Unable to register shutdown handler; are you running a Unix-based OS?"); let mut sigint = signal(SignalKind::interrupt()) diff --git a/src/main.rs b/src/main.rs index f224bd7..4fb9990 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{process::exit, time::Duration}; use base64::{engine::general_purpose, Engine as _}; -use beam::{BeamResult, BeamTask}; +use beam_lib::{TaskRequest, TaskResult}; use blaze::Query; use serde_json::from_slice; @@ -30,6 +30,8 @@ mod graceful_shutdown; type SearchQuery = String; type Report = String; type Created = std::time::SystemTime; //epoch +type BeamTask = TaskRequest; +type BeamResult = TaskResult; #[derive(Debug, Clone)] struct ReportCache { @@ -178,7 +180,7 @@ async fn process_tasks( let comm_result = if let Some(ref err_msg) = error_msg { beam::fail_task(&task, err_msg).await } else { - beam::answer_task(&task, res.unwrap()).await + beam::answer_task(task.id, res.unwrap()).await }; match comm_result { Ok(_) => break, @@ -220,19 +222,16 @@ async fn run_query( ) -> Result { debug!("Run"); - match query.lang.as_str() { - "cql" => { - // TODO: Change query.lang to an enum - Ok(run_cql_query(task, query, obf_cache, report_cache).await)? - }, - unsupported_lang => { - Ok(beam::BeamResult::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - Some(format!("Can't run queries with language {unsupported_lang}")), - )) - } + if query.lang == "cql" { + // TODO: Change query.lang to an enum + return Ok(run_cql_query(task, query, obf_cache, report_cache).await)?; + } else { + return Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!("Can't run inqueries with language {}", query.lang), + )); } } @@ -242,6 +241,13 @@ async fn run_cql_query( obf_cache: &mut ObfCache, report_cache: &mut ReportCache, ) -> Result { + let mut err = beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.to_owned().from], + task.to_owned().id, + String::new(), + ); + let encoded_query = query.lib["content"][0]["data"] .as_str() @@ -298,11 +304,11 @@ async fn run_cql_query( }; let result = beam_result(task.to_owned(), cql_result_new).unwrap_or_else(|e| { - beam::BeamResult::perm_failed( + beam::beam_result::perm_failed( CONFIG.beam_app_id_long.clone(), vec![task.to_owned().from], task.to_owned().id, - Some(e.to_string()) + e.to_string() ) }); @@ -348,14 +354,14 @@ fn replace_cql_library(mut query: Query) -> Result { } fn beam_result( - task: beam::BeamTask, + task: BeamTask, measure_report: String, -) -> Result { +) -> Result { let data = general_purpose::STANDARD.encode(measure_report.as_bytes()); - return Ok(beam::BeamResult::succeeded( + return Ok(beam::beam_result::succeeded( CONFIG.beam_app_id_long.clone(), vec![task.from], task.id, - Some(data) + data )); } diff --git a/src/util.rs b/src/util.rs index 3ce4574..aafc585 100644 --- a/src/util.rs +++ b/src/util.rs @@ -6,7 +6,7 @@ use rand::thread_rng; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; -use tracing::{debug, info, warn}; +use tracing::warn; #[derive(Debug, Deserialize, Serialize)] struct Period { From 47480f10763c34436ba0ffac480cb3430f457500 Mon Sep 17 00:00:00 2001 From: lablans Date: Sat, 30 Sep 2023 12:21:58 +0000 Subject: [PATCH 19/22] Speedup build by minimizing files to be used in docker build context --- .dockerignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..37ad7ed --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!artifacts/ From 763d72e23acd53e1e41455fe4bed48ecfb73febd Mon Sep 17 00:00:00 2001 From: Patrick Skowronek Date: Mon, 2 Oct 2023 11:05:21 +0200 Subject: [PATCH 20/22] fix: add inital statement to exliquid selection --- src/util.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/util.rs b/src/util.rs index aafc585..866b723 100644 --- a/src/util.rs +++ b/src/util.rs @@ -100,8 +100,8 @@ pub(crate) fn replace_cql(decoded_library: impl Into) -> String { ("BBMRI_STRAT_AGE_STRATIFIER", "define AgeClass:\n (AgeInYears() div 10) * 10"), ("BBMRI_STRAT_DEF_SPECIMEN", "define Specimen:"), ("BBMRI_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:"), - ("EXLIQUID_CQL", "define ExliquidSpecimen:\n from [Specimen] S\n where S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen'\ndefine retrieveCondition:\n First(from [Condition] C\n return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first())\ndefine Diagnosis:\n if (retrieveCondition is null) then 'unknown' else retrieveCondition\ndefine function SampleType(specimen FHIR.Specimen):\n specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), - ("EXLIQUID_ALIQUOTS_CQL", "define Aliquot: [Specimen] S where exists S.collection.quantity.value and exists S.parent.reference and S.container.specimenQuantity.value > 0 define AliquotGroupReferences: flatten Aliquot S return S.parent.reference define AliquotGroupWithAliquot: [Specimen] S where not (S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen') and not exists S.collection.quantity.value and not exists S.container.specimenQuantity.value and AliquotGroupReferences contains 'Specimen/' + S.id define PrimarySampleReferences: flatten AliquotGroupWithAliquot S return S.parent.reference define ExliquidSpecimenWithAliquot: from [Specimen] PrimarySample where PrimarySample.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen' and PrimarySampleReferences contains 'Specimen/' + PrimarySample.id define retrieveCondition: First(from [Condition] C return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()) define Diagnosis: if (retrieveCondition is null) then 'unknown' else retrieveCondition define function SampleType(specimen FHIR.Specimen): specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), + ("EXLIQUID_CQL", "define ExliquidSpecimen:\nif InInitialPopulation then [Specimen] S\n where S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen'\ndefine retrieveCondition:\n First(from [Condition] C\n return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first())\ndefine Diagnosis:\n if (retrieveCondition is null) then 'unknown' else retrieveCondition\ndefine function SampleType(specimen FHIR.Specimen):\n specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), + ("EXLIQUID_ALIQUOTS_CQL", "define Aliquot:\nif InInitialPopulation then [Specimen] S\n where exists S.collection.quantity.value and exists S.parent.reference and S.container.specimenQuantity.value > 0 define AliquotGroupReferences: flatten Aliquot S return S.parent.reference define AliquotGroupWithAliquot: [Specimen] S where not (S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen') and not exists S.collection.quantity.value and not exists S.container.specimenQuantity.value and AliquotGroupReferences contains 'Specimen/' + S.id define PrimarySampleReferences: flatten AliquotGroupWithAliquot S return S.parent.reference define ExliquidSpecimenWithAliquot: from [Specimen] PrimarySample where PrimarySample.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen' and PrimarySampleReferences contains 'Specimen/' + PrimarySample.id define retrieveCondition: First(from [Condition] C return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()) define Diagnosis: if (retrieveCondition is null) then 'unknown' else retrieveCondition define function SampleType(specimen FHIR.Specimen): specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()"), ("DKTK_STRAT_GENDER_STRATIFIER", "define Gender:\nif (Patient.gender is null) then 'unknown' else Patient.gender"), ("DKTK_STRAT_AGE_STRATIFIER", "define AgeClass:\nif (Patient.birthDate is null) then 'unknown' else ToString((AgeInYears() div 10) * 10)"), ("DKTK_STRAT_DECEASED_STRATIFIER", "define PatientDeceased:\nFirst (from [Observation: Code '75186-7' from loinc] O return O.value.coding.where(system = 'http://dktk.dkfz.de/fhir/onco/core/CodeSystem/VitalstatusCS').code.first())\ndefine Deceased:\nif (PatientDeceased is null) then 'unbekannt' else PatientDeceased"), @@ -112,8 +112,8 @@ pub(crate) fn replace_cql(decoded_library: impl Into) -> String { ("DKTK_STRAT_MEDICATION_STRATIFIER", "define MedicationStatement:\nif InInitialPopulation then [MedicationStatement] else {} as List "), ("DKTK_STRAT_ENCOUNTER_STRATIFIER", "define Encounter:\nif InInitialPopulation then [Encounter] else {} as List\n\ndefine function Departments(encounter FHIR.Encounter):\nencounter.identifier.where(system = 'http://dktk.dkfz.de/fhir/sid/hki-department').value.first()"), ("DKTK_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:"), - ("EXLIQUID_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:\n exists ExliquidSpecimen\n"), - ("EXLIQUID_STRAT_W_ALIQUOTS", "define InInitialPopulation: exists ExliquidSpecimenWithAliquot") + ("EXLIQUID_STRAT_DEF_IN_INITIAL_POPULATION", "define InInitialPopulation:\n exists ExliquidSpecimen and \n"), + ("EXLIQUID_STRAT_W_ALIQUOTS", "define InInitialPopulation: exists ExliquidSpecimenWithAliquot and \n") ].into(); let mut decoded_library = decoded_library.into(); @@ -414,19 +414,19 @@ mod test { assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "EXLIQUID_CQL"; - let expected_result = "define ExliquidSpecimen:\n from [Specimen] S\n where S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen'\ndefine retrieveCondition:\n First(from [Condition] C\n return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first())\ndefine Diagnosis:\n if (retrieveCondition is null) then 'unknown' else retrieveCondition\ndefine function SampleType(specimen FHIR.Specimen):\n specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()\n"; + let expected_result = "define ExliquidSpecimen:\nif InInitialPopulation then [Specimen] S\n where S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen'\ndefine retrieveCondition:\n First(from [Condition] C\n return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first())\ndefine Diagnosis:\n if (retrieveCondition is null) then 'unknown' else retrieveCondition\ndefine function SampleType(specimen FHIR.Specimen):\n specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()\n"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "EXLIQUID_ALIQUOTS_CQL"; - let expected_result = "define Aliquot: [Specimen] S where exists S.collection.quantity.value and exists S.parent.reference and S.container.specimenQuantity.value > 0 define AliquotGroupReferences: flatten Aliquot S return S.parent.reference define AliquotGroupWithAliquot: [Specimen] S where not (S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen') and not exists S.collection.quantity.value and not exists S.container.specimenQuantity.value and AliquotGroupReferences contains 'Specimen/' + S.id define PrimarySampleReferences: flatten AliquotGroupWithAliquot S return S.parent.reference define ExliquidSpecimenWithAliquot: from [Specimen] PrimarySample where PrimarySample.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen' and PrimarySampleReferences contains 'Specimen/' + PrimarySample.id define retrieveCondition: First(from [Condition] C return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()) define Diagnosis: if (retrieveCondition is null) then 'unknown' else retrieveCondition define function SampleType(specimen FHIR.Specimen): specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()\n"; + let expected_result = "define Aliquot:\nif InInitialPopulation then [Specimen] S\n where exists S.collection.quantity.value and exists S.parent.reference and S.container.specimenQuantity.value > 0 define AliquotGroupReferences: flatten Aliquot S return S.parent.reference define AliquotGroupWithAliquot: [Specimen] S where not (S.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen') and not exists S.collection.quantity.value and not exists S.container.specimenQuantity.value and AliquotGroupReferences contains 'Specimen/' + S.id define PrimarySampleReferences: flatten AliquotGroupWithAliquot S return S.parent.reference define ExliquidSpecimenWithAliquot: from [Specimen] PrimarySample where PrimarySample.identifier.system contains 'http://dktk.dkfz.de/fhir/sid/exliquid-specimen' and PrimarySampleReferences contains 'Specimen/' + PrimarySample.id define retrieveCondition: First(from [Condition] C return C.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first()) define Diagnosis: if (retrieveCondition is null) then 'unknown' else retrieveCondition define function SampleType(specimen FHIR.Specimen): specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first()\n"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "EXLIQUID_STRAT_DEF_IN_INITIAL_POPULATION"; - let expected_result = "define InInitialPopulation:\n exists ExliquidSpecimen\n\n"; + let expected_result = "define InInitialPopulation:\n exists ExliquidSpecimen and \n\n"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "EXLIQUID_STRAT_W_ALIQUOTS"; - let expected_result = "define InInitialPopulation: exists ExliquidSpecimenWithAliquot\n"; + let expected_result = "define InInitialPopulation: exists ExliquidSpecimenWithAliquot and \n\n"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "INVALID_KEY"; From b9d5aeb2c952079184c73900d73b6817af70d488 Mon Sep 17 00:00:00 2001 From: Patrick Skowronek Date: Mon, 2 Oct 2023 11:16:41 +0200 Subject: [PATCH 21/22] fix: corrected env_vars namming --- README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index c9bd160..b1329e2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ To run a standalone Focus, you need at least one running [Samply.Beam.Proxy](htt You can compile and run this application via Cargo, however, we encourage the usage of the pre-compiled [docker images](https://hub.docker.com/r/samply/focus): ```bash -docker run --rm -e BEAM_BASE_URL=http://localhost:8081 -e BLAZE_BASE_URL=http://localhost:8089/fhir -e PROXY_ID=proxy1.broker -e API_KEY=App1Secret -e BEAM_APP_ID_LONG=app1.broker.example.com samply/focus:latest +docker run --rm -e BEAM_PROXY_URL=http://localhost:8081 -e BLAZE_URL=http://localhost:8089/fhir -e PROXY_ID=proxy1.broker -e API_KEY=App1Secret -e BEAM_APP_ID_LONG=app1.broker.example.com samply/focus:latest ``` ## Configuration @@ -24,8 +24,8 @@ docker run --rm -e BEAM_BASE_URL=http://localhost:8081 -e BLAZE_BASE_URL=http:// The following environment variables are mandatory for the usage of Focus. If compiling and running Focus yourself, they are provided as command line options as well. See `focus --help` for details. ```bash -BEAM_BASE_URL = "http://localhost:8081" -BLAZE_BASE_URL = "http://localhost:8089/fhir" +BEAM_PROXY_URL = "http://localhost:8081" +BLAZE_URL = "http://localhost:8089/fhir" PROXY_ID = "proxy1.broker" API_KEY = "App1Secret" BEAM_APP_ID_LONG = "app1.broker.example.com" @@ -36,16 +36,16 @@ BEAM_APP_ID_LONG = "app1.broker.example.com" ```bash RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks, default value: 32 OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no", default value: "yes" -OBFUSCATE-BELOW-10 = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1 -DELTA-PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1 -DELTA-SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no", default value: 20 -DELTA-DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no", default value: 3 +OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1 +DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1 +DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no", default value: 20 +DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no", default value: 3 EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1 -ROUNDING-STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 +ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 QUERIES_TO_CACHE_FILE_PATH = "resources/bbmri" # The path to the file containing BASE64 encoded queries whose results are to be cached, if not set, no results are cached ``` -Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE-ZERO`. +Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable to add additional trusted certificates, e.g., if you have a TLS-terminating proxy server in place. The application respects the `HTTP_PROXY`, `HTTPS_PROXY`, `ALL_PROXY`, `NO_PROXY`, and their respective lowercase equivalents. From 9d515fb93af4f76d46093d77de1532e8fa15338f Mon Sep 17 00:00:00 2001 From: janskiba Date: Wed, 4 Oct 2023 14:51:00 +0000 Subject: [PATCH 22/22] Use new api --- src/beam.rs | 21 +++++++++++---------- src/main.rs | 2 +- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 4558dca..de17cec 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest}; +use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString}; use once_cell::sync::Lazy; use serde::{de::DeserializeOwned, Serialize}; use tracing::{debug, warn}; @@ -8,9 +8,9 @@ use tracing::{debug, warn}; use crate::{config::CONFIG, errors::FocusError}; pub mod beam_result { - use beam_lib::{AppId, TaskResult, WorkStatus, MsgId}; - use serde::Serialize; + use super::*; use serde_json::Value; + use beam_lib::{WorkStatus, AppId}; pub fn claimed(from: AppId, to: Vec, task_id: MsgId) -> TaskResult<()> { TaskResult { @@ -22,25 +22,25 @@ pub mod beam_result { body: (), } } - pub fn succeeded(from: AppId, to: Vec, task_id: MsgId, body: T) -> TaskResult { + pub fn succeeded(from: AppId, to: Vec, task_id: MsgId, body: String) -> TaskResult { TaskResult { from, to, task: task_id, status: WorkStatus::Succeeded, metadata: Value::Null, - body, + body: body.into(), } } - pub fn perm_failed(from: AppId, to: Vec, task_id: MsgId, body: T) -> TaskResult { + pub fn perm_failed(from: AppId, to: Vec, task_id: MsgId, body: String) -> TaskResult { TaskResult { from, to, task: task_id, status: WorkStatus::PermFailed, metadata: Value::Null, - body, + body: body.into(), } } } @@ -73,18 +73,19 @@ static BEAM_CLIENT: Lazy = Lazy::new(|| BeamClient::new( CONFIG.beam_proxy_url.to_string().parse().expect("Uri always converts to url") )); -pub async fn retrieve_tasks() -> Result>, FocusError> { +pub async fn retrieve_tasks() -> Result>, FocusError> { debug!("Retrieving tasks..."); let block = BlockingOptions { wait_time: Some(Duration::from_secs(10)), wait_count: Some(1) }; - BEAM_CLIENT.poll_pending_tasks(&block) + BEAM_CLIENT.poll_pending_tasks::(&block) .await + .map(|v| v.into_iter().map(|TaskRequest { id, body, from, to, metadata, ttl, failure_strategy }| TaskRequest { body: body.into_string(), id, from, to, metadata, ttl, failure_strategy }).collect()) .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { +pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { debug!("Answer task with id: {task_id}"); BEAM_CLIENT.put_result(result, &task_id) .await diff --git a/src/main.rs b/src/main.rs index 4fb9990..3bf175c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ type SearchQuery = String; type Report = String; type Created = std::time::SystemTime; //epoch type BeamTask = TaskRequest; -type BeamResult = TaskResult; +type BeamResult = TaskResult; #[derive(Debug, Clone)] struct ReportCache {