From 650eb1f019de3a921d739c5d00218b4151b4411b Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Wed, 21 Jun 2023 00:00:35 +0300 Subject: [PATCH 1/4] code to fetch lmdb info for rust correlation preprocessing --- wqflask/wqflask/correlation/pre_computes.py | 56 ++++++++++++++++++--- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/wqflask/wqflask/correlation/pre_computes.py b/wqflask/wqflask/correlation/pre_computes.py index 2831bd399..e82eb0ec0 100644 --- a/wqflask/wqflask/correlation/pre_computes.py +++ b/wqflask/wqflask/correlation/pre_computes.py @@ -14,25 +14,26 @@ from json.decoder import JSONDecodeError -def cache_trait_metadata(dataset_name, data): +def cache_trait_metadata(dataset_name, data): try: - with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"),map_size=20971520) as env: - with env.begin(write=True) as txn: + with lmdb.open(os.path.join(TMPDIR, f"metadata_{dataset_name}"), map_size=20971520) as env: + with env.begin(write=True) as txn: data_bytes = pickle.dumps(data) txn.put(f"{dataset_name}".encode(), data_bytes) current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') txn.put(b"creation_date", current_date.encode()) return "success" - except lmdb.Error as error: + except lmdb.Error as error: pass + def read_trait_metadata(dataset_name): try: - with lmdb.open(os.path.join(TMPDIR,f"metadata_{dataset_name}"), - readonly=True, lock=False) as env: + with lmdb.open(os.path.join(TMPDIR, f"metadata_{dataset_name}"), + readonly=True, lock=False) as env: with env.begin() as txn: db_name = txn.get(dataset_name.encode()) return (pickle.loads(db_name) if db_name else {}) @@ -82,8 +83,6 @@ def generate_filename(*args, suffix="", file_ext="json"): return f"{hashlib.md5(string_unicode).hexdigest()}_{suffix}.{file_ext}" - - def fetch_text_file(dataset_name, conn, text_dir=TMPDIR): """fetch textfiles with strain vals if exists""" @@ -176,3 +175,44 @@ def __write_to_file__(file_path, data, col_names): if (results and file_name): __write_to_file__(os.path.join(text_dir, file_name), *__parse_to_dict__(results)) + + +# check for file path +# I need the lmdb path # tmpdir +def __generate_target_name__(db_name): + # todo add expiry time and checker + with conn.cursor() as cursor: + cursor.execute( + 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,)) + results = cursor.fetchone() + if (results): + return __sanitise_filename__( + f"ProbeSetFreezeId_{results[0]}_{results[1]}") + + +def fetch_csv_info(textfile_path: str, db_target_name: str): + """ + alternative for processing csv textfiles with rust + !.currently expiremental + """ + raise ValueError + + +def fetch_lmdb_info(db_path: str, db_target_name: str): + """ + check for if lmdb db exist and also the target db + e.g ProbeSets: ProbestFreeze__112_ + """ + # open db_read if results return none write the file write the file target + with lmdb.open(target_file_path, readonly=True, lock=False) as env: + with env.begin() as txn: + target_key = __generate_file_name__(db_target_name) + dataset = txn.get(target_key.encode()) + if dataset: + return { + "lmdb_target_path": f"{db_path}data.mdb", + "lmdb_target_key": target_key, + "file_type": "lmdb", + } + + return {} From 7a5f04106ba3df4f3268d56d8813105d31d66733 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Wed, 21 Jun 2023 00:06:30 +0300 Subject: [PATCH 2/4] code to fetch csv info --- wqflask/wqflask/correlation/pre_computes.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/wqflask/wqflask/correlation/pre_computes.py b/wqflask/wqflask/correlation/pre_computes.py index e82eb0ec0..c91e3862d 100644 --- a/wqflask/wqflask/correlation/pre_computes.py +++ b/wqflask/wqflask/correlation/pre_computes.py @@ -190,12 +190,18 @@ def __generate_target_name__(db_name): f"ProbeSetFreezeId_{results[0]}_{results[1]}") -def fetch_csv_info(textfile_path: str, db_target_name: str): +def fetch_csv_info(db_target_name: str, conn): """ alternative for processing csv textfiles with rust !.currently expiremental """ - raise ValueError + csv_file_path = fetch_text_file(dataset_name, conn) + if csv_file_path: + return { + "file_type": "csv", + "lmdb_target_path": "csv_file_path", + "lmdb_target_path": "", + } def fetch_lmdb_info(db_path: str, db_target_name: str): From 46d32a65d27a5577d7a61f76fb55c7c2f32f46d3 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Wed, 21 Jun 2023 00:19:23 +0300 Subject: [PATCH 3/4] generate general info for file output --- wqflask/wqflask/correlation/pre_computes.py | 39 ++++++++++++++------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/wqflask/wqflask/correlation/pre_computes.py b/wqflask/wqflask/correlation/pre_computes.py index c91e3862d..81111a3cc 100644 --- a/wqflask/wqflask/correlation/pre_computes.py +++ b/wqflask/wqflask/correlation/pre_computes.py @@ -210,15 +210,30 @@ def fetch_lmdb_info(db_path: str, db_target_name: str): e.g ProbeSets: ProbestFreeze__112_ """ # open db_read if results return none write the file write the file target - with lmdb.open(target_file_path, readonly=True, lock=False) as env: - with env.begin() as txn: - target_key = __generate_file_name__(db_target_name) - dataset = txn.get(target_key.encode()) - if dataset: - return { - "lmdb_target_path": f"{db_path}data.mdb", - "lmdb_target_key": target_key, - "file_type": "lmdb", - } - - return {} + try: + with lmdb.open(target_file_path, readonly=True, lock=False) as env: + with env.begin() as txn: + target_key = __generate_file_name__(db_target_name) + dataset = txn.get(target_key.encode()) + if dataset: + return { + "lmdb_target_path": f"{db_path}data.mdb", + "lmdb_target_key": target_key, + "file_type": "lmdb", + } + except Exception: + return {} + + +def generate_general_info(trait_name, sample_names, + strains_vals, file_type_info): + if not file_type_info: + #! code should not be reached at this point + pass + # implement fetch code + return { + "trait_name": target_name, + "primary_sample_names": primary_sample_names, + "primary_strains_vals": strain_vals, + **file_type_info + } From 0db7bfa307207c38d1727918f5192b39ffe1bda5 Mon Sep 17 00:00:00 2001 From: Alexander_Kabui Date: Wed, 21 Jun 2023 01:05:40 +0300 Subject: [PATCH 4/4] code refactoring --- wqflask/wqflask/correlation/pre_computes.py | 54 ++++++++++--------- .../wqflask/correlation/rust_correlation.py | 13 +++++ 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/wqflask/wqflask/correlation/pre_computes.py b/wqflask/wqflask/correlation/pre_computes.py index 81111a3cc..65774326e 100644 --- a/wqflask/wqflask/correlation/pre_computes.py +++ b/wqflask/wqflask/correlation/pre_computes.py @@ -13,6 +13,7 @@ from base.webqtlConfig import TMPDIR from json.decoder import JSONDecodeError +from gn3.db_utils import database_connection def cache_trait_metadata(dataset_name, data): @@ -177,47 +178,47 @@ def __write_to_file__(file_path, data, col_names): *__parse_to_dict__(results)) -# check for file path -# I need the lmdb path # tmpdir def __generate_target_name__(db_name): # todo add expiry time and checker - with conn.cursor() as cursor: - cursor.execute( - 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,)) - results = cursor.fetchone() - if (results): - return __sanitise_filename__( - f"ProbeSetFreezeId_{results[0]}_{results[1]}") + with database_connection(SQL_URI) as conn: + with conn.cursor() as cursor: + cursor.execute( + 'SELECT Id, FullName FROM ProbeSetFreeze WHERE Name = %s', (db_name,)) + results = cursor.fetchone() + if (results): + return __sanitise_filename__( + f"ProbeSetFreezeId_{results[0]}_{results[1]}") -def fetch_csv_info(db_target_name: str, conn): +def fetch_csv_info(db_target_name: str): """ alternative for processing csv textfiles with rust !.currently expiremental """ - csv_file_path = fetch_text_file(dataset_name, conn) - if csv_file_path: - return { - "file_type": "csv", - "lmdb_target_path": "csv_file_path", - "lmdb_target_path": "", - } + with database_connection(SQL_URI) as conn: + csv_file_path = fetch_text_file(dataset_name, conn) + if csv_file_path: + return { + "file_type": "csv", + "lmdb_target_path": "csv_file_path", + "lmdb_target_path": "", + } -def fetch_lmdb_info(db_path: str, db_target_name: str): +def fetch_lmdb_info(db_target_name: str, lmdb_dataset_path=LMDB_PATH): """ check for if lmdb db exist and also the target db e.g ProbeSets: ProbestFreeze__112_ """ # open db_read if results return none write the file write the file target try: - with lmdb.open(target_file_path, readonly=True, lock=False) as env: + with lmdb.open(lmdb_dataset_path, readonly=True, lock=False) as env: with env.begin() as txn: - target_key = __generate_file_name__(db_target_name) + target_key = __generate_target_name__(db_target_name) dataset = txn.get(target_key.encode()) if dataset: return { - "lmdb_target_path": f"{db_path}data.mdb", + "lmdb_target_path": f"{db_path}/data.mdb", "lmdb_target_key": target_key, "file_type": "lmdb", } @@ -225,15 +226,16 @@ def fetch_lmdb_info(db_path: str, db_target_name: str): return {} -def generate_general_info(trait_name, sample_names, - strains_vals, file_type_info): +def generate_general_info(trait_name: str, corr_type: str, + sample_dict: dict, file_type_info: dict): if not file_type_info: #! code should not be reached at this point pass # implement fetch code return { - "trait_name": target_name, - "primary_sample_names": primary_sample_names, - "primary_strains_vals": strain_vals, + "trait_name": trait_name, + "corr_type": corr_type, + "primary_sample_names": list(sample_dict.keys()), + "primary_strains_vals": list(sample_dict.values()), **file_type_info } diff --git a/wqflask/wqflask/correlation/rust_correlation.py b/wqflask/wqflask/correlation/rust_correlation.py index 41dd77a1d..fe1260d72 100644 --- a/wqflask/wqflask/correlation/rust_correlation.py +++ b/wqflask/wqflask/correlation/rust_correlation.py @@ -19,6 +19,7 @@ from gn3.computations.rust_correlation import run_correlation from gn3.computations.rust_correlation import get_sample_corr_data from gn3.computations.rust_correlation import parse_tissue_corr_data +from gn3.computations.rust_correlation import run_lmdb_correlation from gn3.db_utils import database_connection from wqflask.correlation.exceptions import WrongCorrelationType @@ -258,6 +259,18 @@ def __compute_sample_corr__( return {} if target_dataset.type == "ProbeSet" and start_vars.get("use_cache") == "true": + + #nit code try to fetch lmdb file + # add merge case for csv file + try: + + lmdb_info = fetch_lmdb_info(target_dataset.name) + if lmdb_info: + return run_lmdb_correlation(lmdb_info) + except Exception: + # compute correlation the normal way + pass + with database_connection(SQL_URI) as conn: file_path = fetch_text_file(target_dataset.name, conn) if file_path: