Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Languages as features #166

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "focus"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
license = "Apache-2.0"

Expand All @@ -14,32 +14,34 @@ serde_json = "1.0"
thiserror = "1.0.38"
chrono = "0.4.31"
indexmap = "2.1.0"
tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] }
tokio = { version = "1.25.0", default-features = false, features = ["signal", "rt-multi-thread", "macros"] }
beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] }
laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" }
uuid = "1.8.0"
rand = { default-features = false, version = "0.8.5" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"] }
tryhard = "0.5"
kurtbuilds_sqlx_serde = "0.3.1"

# Logging
tracing = { version = "0.1.37", default_features = false }
tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] }
tracing = { version = "0.1.37", default-features = false }
tracing-subscriber = { version = "0.3.11", default-features = false, features = ["env-filter", "ansi"] }

# Global variables
once_cell = "1.18"

# Command Line Interface
clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] }
clap = { version = "4", default-features = false, features = ["std", "env", "derive", "help", "color"] }

# Query via SQL
sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono", "rust_decimal", "uuid"], optional = true }
kurtbuilds_sqlx_serde = { version = "0.3.2", features = [ "json", "decimal", "chrono", "uuid"], optional = true }


[features]
default = []
bbmri = []
dktk = []
dktk = ["query-sql"]
query-sql = ["dep:sqlx", "dep:kurtbuilds_sqlx_serde"]

[dev-dependencies]
pretty_assertions = "1.4.0"
Expand Down
1 change: 0 additions & 1 deletion resources/sql/SELECT_TABLES

This file was deleted.

1 change: 1 addition & 0 deletions resources/sql/SELECT_TEST
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 10 AS VALUE, quote_literal('Hello Rustaceans') AS GREETING, 4.7 as FLOATY, CURRENT_DATE AS TODAY;
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub enum Obfuscate {
pub enum EndpointType {
Blaze,
Omop,
#[cfg(feature = "query-sql")]
BlazeAndSql,
#[cfg(feature = "query-sql")]
Sql,
}

Expand All @@ -29,7 +31,9 @@ impl fmt::Display for EndpointType {
match self {
EndpointType::Blaze => write!(f, "blaze"),
EndpointType::Omop => write!(f, "omop"),
#[cfg(feature = "query-sql")]
EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"),
#[cfg(feature = "query-sql")]
EndpointType::Sql => write!(f, "sql"),
}
}
Expand Down Expand Up @@ -159,6 +163,7 @@ struct CliArgs {
auth_header: Option<String>,

/// Database connection string
#[cfg(feature = "query-sql")]
#[clap(long, env, value_parser)]
postgres_connection_string: Option<String>,
}
Expand Down Expand Up @@ -188,6 +193,7 @@ pub(crate) struct Config {
pub provider: Option<String>,
pub provider_icon: Option<String>,
pub auth_header: Option<String>,
#[cfg(feature = "query-sql")]
pub postgres_connection_string: Option<String>,
}

Expand Down Expand Up @@ -230,6 +236,7 @@ impl Config {
provider: cli_args.provider,
provider_icon: cli_args.provider_icon,
auth_header: cli_args.auth_header,
#[cfg(feature = "query-sql")]
postgres_connection_string: cli_args.postgres_connection_string,
client,
};
Expand Down
26 changes: 26 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,29 @@ pub fn serialize_rows(rows: Vec<PgRow>) -> Result<Value, FocusError> {

Ok(Value::Array(rows_json))
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
#[ignore] //TODO mock DB
async fn serialize() {
let pool =
get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1)
.await
.unwrap();

let rows = run_query(&pool, SQL_REPLACE_MAP.get("SELECT_TEST").unwrap())
.await
.unwrap();

dbg!(&rows);
let rows_json = serialize_rows(rows).unwrap();
dbg!(&rows_json);

assert!(rows_json.is_array());

assert_ne!(rows_json[0]["floaty"], Value::Null);
}
}
5 changes: 3 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ pub enum FocusError {
MissingExporterTaskType,
#[error("Cannot connect to database: {0}")]
CannotConnectToDatabase(String),
#[error("Error executing query: {0}")]
ErrorExecutingQuery(sqlx::Error),
#[error("QueryResultBad: {0}")]
QueryResultBad(String),
#[error("Query not allowed: {0}")]
QueryNotAllowed(String),
#[cfg(feature = "query-sql")]
#[error("Error executing query: {0}")]
ErrorExecutingQuery(sqlx::Error),
}

impl FocusError {
Expand Down
42 changes: 34 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ mod errors;
mod graceful_shutdown;
mod logger;

mod db;
mod exporter;
mod intermediate_rep;
mod projects;
mod task_processing;
mod util;

#[cfg(feature = "query-sql")]
mod db;

use base64::engine::general_purpose;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use beam_lib::{TaskRequest, TaskResult};
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use laplace_rs::ObfCache;
use sqlx::PgPool;
use tokio::sync::Mutex;

use crate::blaze::{parse_blaze_query_payload_ast, AstQuery};
Expand Down Expand Up @@ -118,22 +119,45 @@ pub async fn main() -> ExitCode {
}
}

async fn main_loop() -> ExitCode {
let db_pool = if let Some(connection_string) = CONFIG.postgres_connection_string.clone() {
#[cfg(not(feature = "query-sql"))]
type DbPool = ();

#[cfg(feature = "query-sql")]
type DbPool = sqlx::PgPool;

#[cfg(not(feature = "query-sql"))]
async fn get_db_pool() -> Result<Option<DbPool>,ExitCode> {
Ok(None)
}

#[cfg(feature = "query-sql")]
async fn get_db_pool() -> Result<Option<DbPool>,ExitCode> {
if let Some(connection_string) = CONFIG.postgres_connection_string.clone() {
match db::get_pg_connection_pool(&connection_string, 8).await {
Err(e) => {
error!("Error connecting to database: {}", e);
return ExitCode::from(8);
Err(ExitCode::from(8))
}
Ok(pool) => Some(pool),
Ok(pool) => Ok(Some(pool)),
}
} else {
None
Ok(None)
}
}

async fn main_loop() -> ExitCode {
let db_pool = match get_db_pool().await {
Ok(pool) => pool,
Err(code) => {
return code;
},
};
let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type {
EndpointType::Blaze => || blaze::check_availability().boxed(),
EndpointType::Omop => || async { true }.boxed(), // TODO health check
#[cfg(feature = "query-sql")]
EndpointType::BlazeAndSql => || blaze::check_availability().boxed(),
#[cfg(feature = "query-sql")]
EndpointType::Sql => || async { true }.boxed(),
};
let mut failures = 0;
Expand Down Expand Up @@ -169,7 +193,7 @@ async fn process_task(
task: &BeamTask,
obf_cache: Arc<Mutex<ObfCache>>,
report_cache: Arc<Mutex<ReportCache>>,
db_pool: Option<PgPool>,
db_pool: Option<DbPool>,
) -> Result<BeamResult, FocusError> {
debug!("Processing task {}", task.id);

Expand Down Expand Up @@ -217,6 +241,7 @@ async fn process_task(
)
.await
},
#[cfg(feature = "query-sql")]
EndpointType::BlazeAndSql => {
let mut generated_from_ast: bool = false;
let data = base64_decode(&task.body)?;
Expand Down Expand Up @@ -260,6 +285,7 @@ async fn process_task(
.await
}
},
#[cfg(feature="query-sql")]
EndpointType::Sql => {
let data = base64_decode(&task.body)?;
let query_maybe: Result<db::SqlQuery, serde_json::Error> = serde_json::from_slice(&(data));
Expand Down
Loading