diff --git a/metabolights_utils/isa_file_utils.py b/metabolights_utils/isa_file_utils.py index 88844d9..52aabdb 100644 --- a/metabolights_utils/isa_file_utils.py +++ b/metabolights_utils/isa_file_utils.py @@ -1,10 +1,11 @@ import logging import os -from typing import Any, Union +from typing import Any, List, Tuple, Union from metabolights_utils.isatab import Writer -from metabolights_utils.models.isa.common import IsaTableFile from metabolights_utils.models.metabolights.model import MetabolightsStudyModel +from metabolights_utils.models.isa.common import IsaTable, IsaTableColumn, IsaTableFile +from metabolights_utils.models.isa.enums import ColumnsStructure logger = logging.getLogger(__name__) @@ -103,3 +104,106 @@ async def save_isa_table( cell = cell.strip('"') row[idx] = cell f.write("\t".join(row) + "\n") + + @staticmethod + def add_isa_table_columns( + mtbls_isa_table: IsaTable, + header_name: str, + new_column_index: Union[int, None] = None, + column_structure: ColumnsStructure = ColumnsStructure.ONTOLOGY_COLUMN, + default_value: Union[str, None] = None, + ) -> Tuple[List[str], int]: + if column_structure not in ( + ColumnsStructure.SINGLE_COLUMN_AND_UNIT_ONTOLOGY, + ColumnsStructure.ONTOLOGY_COLUMN, + ColumnsStructure.SINGLE_COLUMN, + ): + raise ValueError( + f"Column structure {column_structure.value} is not valid. " + ) + table_columns = mtbls_isa_table.columns + headers = mtbls_isa_table.headers + selected_index = len(table_columns) + if ( + new_column_index is not None + and new_column_index >= 0 + and len(table_columns) >= new_column_index + ): + selected_index = new_column_index + search_headers = [header_name] + new_column_names = [] + linked_column_names = [] + if column_structure == ColumnsStructure.SINGLE_COLUMN_AND_UNIT_ONTOLOGY: + linked_column_names = ["Unit", "Term Source REF", "Term Accession Number"] + elif column_structure == ColumnsStructure.ONTOLOGY_COLUMN: + linked_column_names = ["Term Source REF", "Term Accession Number"] + elif column_structure == ColumnsStructure.SINGLE_COLUMN: + linked_column_names = [] + + search_headers.extend(linked_column_names) + + default_value = default_value if default_value else "" + new_columns = [] + + for item in search_headers: + count = sum([1 for x in headers if x.column_header == item]) + new_column_name = f"{item}.{(count)}" if count > 0 else item + new_column_names.append(new_column_name) + + if item == header_name: + categories = header_name.split("[") + category = "" + if len(categories) > 1: + category = categories[0] + column_model = IsaTableColumn( + column_index=selected_index, + column_header=item, + column_name=new_column_name, + column_category=category, + column_prefix=category, + additional_columns=linked_column_names, + default_value=default_value, + column_structure=column_structure, + ) + else: + column_model = IsaTableColumn( + column_index=selected_index, + column_header=item, + column_name=new_column_name, + column_category="", + column_prefix="", + column_structure=ColumnsStructure.LINKED_COLUMN, + ) + + new_columns.append(column_model) + mtbls_isa_table.data[new_column_name] = [] + if mtbls_isa_table.columns and mtbls_isa_table.data: + first_column_data = mtbls_isa_table.data[mtbls_isa_table.columns[0]] + mtbls_isa_table.data[new_column_name] = [""] * len(first_column_data) + + if selected_index == len(table_columns): + updated_header_models = headers + new_columns + updated_columns = table_columns + new_column_names + else: + updated_header_models = ( + headers[:selected_index] + new_columns + headers[selected_index:] + ) + + updated_columns = ( + table_columns[:selected_index] + + new_column_names + + table_columns[selected_index:] + ) + + for idx, val in enumerate(updated_columns): + if idx >= len(table_columns): + table_columns.append(val) + headers.append(updated_header_models[idx]) + headers[idx].column_index = idx + else: + table_columns[idx] = val + headers[idx] = updated_header_models[idx] + headers[idx].column_index = idx + mtbls_isa_table.column_indices = [x for x in range(len(table_columns))] + + return new_column_names, selected_index