diff --git a/dicee/config.py b/dicee/config.py index 3d7921cc..22cd7844 100644 --- a/dicee/config.py +++ b/dicee/config.py @@ -50,6 +50,9 @@ def __init__(self, **kwargs): self.backend: str = "pandas" """Backend to read, process, and index input knowledge graph. pandas, polars and rdflib available""" + self.separator: str = "\s+" + """separator for extracting head, relation and tail from a triple""" + self.trainer: str = 'torchCPUTrainer' """Trainer for knowledge graph embedding model""" diff --git a/dicee/dataset_classes.py b/dicee/dataset_classes.py index 33733d50..30729aa6 100644 --- a/dicee/dataset_classes.py +++ b/dicee/dataset_classes.py @@ -244,9 +244,7 @@ class OnevsAllDataset(torch.utils.data.Dataset): def __init__(self, train_set_idx: np.ndarray, entity_idxs): super().__init__() - assert isinstance(train_set_idx, np.memmap) - - assert isinstance(train_set_idx, np.ndarray) + assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray) assert len(train_set_idx) > 0 self.train_data = train_set_idx self.target_dim = len(entity_idxs) @@ -300,8 +298,7 @@ def __init__(self, train_set_idx: np.ndarray, entity_idxs, relation_idxs, form, label_smoothing_rate: float = 0.0): super().__init__() assert len(train_set_idx) > 0 - assert isinstance(train_set_idx, np.memmap) - assert isinstance(train_set_idx, np.ndarray) + assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray) self.train_data = None self.train_target = None self.label_smoothing_rate = torch.tensor(label_smoothing_rate) diff --git a/dicee/executer.py b/dicee/executer.py index 22d8acd5..bddbeda5 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -6,13 +6,13 @@ import os import datetime from pytorch_lightning import seed_everything - -from dicee.knowledge_graph import KG -from dicee.evaluator import Evaluator +from .knowledge_graph import KG +from .evaluator import Evaluator # Avoid -from dicee.static_preprocess_funcs import preprocesses_input_args -from dicee.trainer import DICE_Trainer -from dicee.static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store +from .static_preprocess_funcs import preprocesses_input_args +from .trainer import DICE_Trainer +from .static_funcs import timeit, read_or_load_kg, load_json, store, create_experiment_folder +import numpy as np logging.getLogger('pytorch_lightning').setLevel(0) warnings.filterwarnings(action="ignore", category=DeprecationWarning) @@ -33,8 +33,10 @@ def __init__(self, args, continuous_training=False): seed_everything(args.random_seed, workers=True) # (3) Set the continual training flag self.is_continual_training = continuous_training + # TODO: self.storage_path vs self.args.full_storage_path. One of them is redudandant. + self.storage_path=None # (4) Create an experiment folder or use the previous one - continual_training_setup_executor(self) + self.continual_training_setup_executor() # (5) A variable is initialized for pytorch lightning trainer or DICE_Trainer() self.trainer = None self.trained_model = None @@ -42,12 +44,29 @@ def __init__(self, args, continuous_training=False): self.knowledge_graph = None # (7) Store few data in memory for numerical results, e.g. runtime, H@1 etc. self.report = dict() - # (8) Create an object to carry out link prediction evaluations - self.evaluator = None # e.g. Evaluator(self) + # (8) Create an object to carry out link prediction evaluations, e.g. Evaluator(self) + self.evaluator = None # (9) Execution start time self.start_time = None - def read_or_load_kg(self): + def continual_training_setup_executor(self) -> None: + if self.is_continual_training: + # () We continue the training, then we store new models on previous path. + self.storage_path = self.args.full_storage_path + else: + # Create a single directory containing KGE and all related data + if self.args.path_to_store_single_run: + os.makedirs(self.args.path_to_store_single_run, exist_ok=True) + self.args.full_storage_path = self.args.path_to_store_single_run + else: + # Create a parent and subdirectory. + self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path) + self.storage_path = self.args.full_storage_path + with open(self.args.full_storage_path + '/configuration.json', 'w') as file_descriptor: + temp = vars(self.args) + json.dump(temp, file_descriptor, indent=3) + + def dept_read_or_load_kg(self): print('*** Read or Load Knowledge Graph ***') start_time = time.time() kg = KG(dataset_dir=self.args.dataset_dir, @@ -84,9 +103,6 @@ def read_preprocess_index_serialize_data(self) -> None: None """ - # (1) Read & Preprocess & Index & Serialize Input Data. - self.knowledge_graph = self.read_or_load_kg() - # (2) Store the stats and share parameters self.args.num_entities = self.knowledge_graph.num_entities self.args.num_relations = self.knowledge_graph.num_relations @@ -102,19 +118,6 @@ def read_preprocess_index_serialize_data(self) -> None: self.report['runtime_kg_loading'] = time.time() - self.start_time - def load_indexed_data(self) -> None: - """ Load the indexed data from disk into memory - - Parameter - ---------- - - Return - ---------- - None - - """ - self.knowledge_graph = read_or_load_kg(self.args, cls=KG) - @timeit def save_trained_model(self) -> None: """ Save a knowledge graph embedding model @@ -188,6 +191,7 @@ def end(self, form_of_labelling: str) -> dict: def write_report(self) -> None: """ Report training related information in a report.json file """ + # @TODO: Move to static funcs # Report total runtime. self.report['Runtime'] = time.time() - self.start_time print(f"Total Runtime: {self.report['Runtime']:.3f} seconds") @@ -215,7 +219,39 @@ def start(self) -> dict: print(f"Start time:{datetime.datetime.now()}") # (1) Loading the Data # Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute - self.load_indexed_data() if self.is_continual_training else self.read_preprocess_index_serialize_data() + if self.args.path_to_store_single_run and os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"): + # Read the JSON file + with open(self.args.path_to_store_single_run+'/memory_map_details.json', 'r') as file_descriptor: + memory_map_details = json.load(file_descriptor) + self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy', + mode='r', + dtype=memory_map_details["dtype"], + shape=tuple(memory_map_details["shape"])) + # + self.args.num_entities = memory_map_details["num_entities"] + self.args.num_relations = memory_map_details["num_relations"] + self.args.num_tokens = None + self.args.max_length_subword_tokens = None + self.args.ordered_bpe_entities = None + else: + self.knowledge_graph = read_or_load_kg(self.args, cls=KG) + if self.is_continual_training is False: + self.args.num_entities = self.knowledge_graph.num_entities + self.args.num_relations = self.knowledge_graph.num_relations + self.args.num_tokens = self.knowledge_graph.num_tokens + self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens + self.args.ordered_bpe_entities = self.knowledge_graph.ordered_bpe_entities + self.report['num_train_triples'] = len(self.knowledge_graph.train_set) + self.report['num_entities'] = self.knowledge_graph.num_entities + self.report['num_relations'] = self.knowledge_graph.num_relations + self.report['max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None + self.report['runtime_kg_loading'] = time.time() - self.start_time + data={"shape":tuple(self.knowledge_graph.train_set.shape), + "dtype":self.knowledge_graph.train_set.dtype.str, + "num_entities":self.knowledge_graph.num_entities, + "num_relations":self.knowledge_graph.num_relations} + with open(self.args.full_storage_path + '/memory_map_details.json', 'w') as file_descriptor: + json.dump(data, file_descriptor, indent=4) # (2) Create an evaluator object. self.evaluator = Evaluator(args=self.args) # (3) Create a trainer object. @@ -292,4 +328,4 @@ def continual_start(self) -> dict: else: self.evaluator = Evaluator(args=self.args, is_continual_training=True) self.evaluator.dummy_eval(self.trained_model, form_of_labelling) - return {**self.report, **self.evaluator.report} \ No newline at end of file + return {**self.report, **self.evaluator.report} diff --git a/dicee/knowledge_graph.py b/dicee/knowledge_graph.py index a21586ab..b105cc00 100644 --- a/dicee/knowledge_graph.py +++ b/dicee/knowledge_graph.py @@ -3,8 +3,6 @@ import sys import pandas as pd import polars as pl -import numpy as np -from .read_preprocess_save_load_kg.util import load_numpy_ndarray class KG: """ Knowledge Graph """ @@ -18,7 +16,7 @@ def __init__(self, dataset_dir: str = None, add_reciprocal: bool = None, eval_model: str = None, read_only_few: int = None, sample_triples_ratio: float = None, path_for_serialization: str = None, - entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None): + entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None, separator:str=None): """ :param dataset_dir: A path of a folder containing train.txt, valid.txt, test.text :param byte_pair_encoding: Apply Byte pair encoding. @@ -36,13 +34,14 @@ def __init__(self, dataset_dir: str = None, :param training_technique """ self.dataset_dir = dataset_dir + self.sparql_endpoint = sparql_endpoint + self.path_single_kg = path_single_kg + self.byte_pair_encoding = byte_pair_encoding self.ordered_shaped_bpe_tokens = None - self.sparql_endpoint = sparql_endpoint self.add_noise_rate = add_noise_rate self.num_entities = None self.num_relations = None - self.path_single_kg = path_single_kg self.path_for_deserialization = path_for_deserialization self.add_reciprocal = add_reciprocal self.eval_model = eval_model @@ -72,6 +71,7 @@ def __init__(self, dataset_dir: str = None, self.target_dim = None self.train_target_indices = None self.ordered_bpe_entities = None + self.separator=separator if self.path_for_deserialization is None: # Read a knowledge graph into memory @@ -83,8 +83,8 @@ def __init__(self, dataset_dir: str = None, else: LoadSaveToDisk(kg=self).load() assert len(self.train_set) > 0, "Training set is empty" - self._describe() - + self.description_of_input=None + self.describe() if self.entity_to_idx is not None: assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx, pl.DataFrame), f"entity_to_idx must be a dict or a polars DataFrame: {type(self.entity_to_idx)}" @@ -96,8 +96,8 @@ def __init__(self, dataset_dir: str = None, print(f"No inverse mapping created as self.entity_to_idx is not a type of dictionary but {type(self.entity_to_idx)}\n" f"Backend might be selected as polars") - def _describe(self) -> None: - self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir} -------------------' + def describe(self) -> None: + self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir if isinstance(self.dataset_dir, str) else self.sparql_endpoint if isinstance(self.sparql_endpoint, str) else self.path_single_kg} -------------------' if self.byte_pair_encoding: self.description_of_input += f'\nNumber of tokens:{self.num_tokens}' \ f'\nNumber of max sequence of sub-words: {self.max_length_subword_tokens}' \ diff --git a/dicee/read_preprocess_save_load_kg/read_from_disk.py b/dicee/read_preprocess_save_load_kg/read_from_disk.py index 596517bf..5099d43c 100644 --- a/dicee/read_preprocess_save_load_kg/read_from_disk.py +++ b/dicee/read_preprocess_save_load_kg/read_from_disk.py @@ -24,11 +24,12 @@ def start(self) -> None: ------- None """ + if self.kg.path_single_kg is not None: self.kg.raw_train_set = read_from_disk(self.kg.path_single_kg, self.kg.read_only_few, self.kg.sample_triples_ratio, - backend=self.kg.backend) + backend=self.kg.backend,separator=self.kg.separator) if self.kg.add_noise_rate: self.add_noisy_triples_into_training() self.kg.raw_valid_set = None @@ -41,13 +42,13 @@ def start(self) -> None: for i in glob.glob(self.kg.dataset_dir + '/*'): if 'train' in i: self.kg.raw_train_set = read_from_disk(i, self.kg.read_only_few, self.kg.sample_triples_ratio, - backend=self.kg.backend) + backend=self.kg.backend, separator=self.kg.separator) if self.kg.add_noise_rate: self.add_noisy_triples_into_training() elif 'test' in i and self.kg.eval_model is not None: - self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend) + self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator) elif 'valid' in i and self.kg.eval_model is not None: - self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend) + self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator) else: print(f'Not processed data: {i}') else: diff --git a/dicee/read_preprocess_save_load_kg/util.py b/dicee/read_preprocess_save_load_kg/util.py index 3661915c..a92098ce 100644 --- a/dicee/read_preprocess_save_load_kg/util.py +++ b/dicee/read_preprocess_save_load_kg/util.py @@ -109,8 +109,9 @@ def timeit_wrapper(*args, **kwargs): @timeit -def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None) -> polars.DataFrame: +def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None, separator:str=None) -> polars.DataFrame: """ Load and Preprocess via Polars """ + assert separator is not None, "separator cannot be None" print(f'*** Reading {data_path} with Polars ***') # (1) Load the data. df = polars.read_csv(data_path, @@ -120,7 +121,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: columns=[0, 1, 2], dtypes=[polars.String], new_columns=['subject', 'relation', 'object'], - separator=" ") # \s+ doesn't work for polars + separator=separator) # \s+ doesn't work for polars # (2) Sample from (1). if sample_triples_ratio: print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...') @@ -135,13 +136,13 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: @timeit -def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None): +def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None,separator:str=None): + assert separator is not None, "separator cannot be None" print(f'*** Reading {data_path} with Pandas ***') if data_path[-3:] in [".nt","ttl", 'txt', 'csv', 'zst']: print('Reading with pandas.read_csv with sep ** s+ ** ...') - # TODO: if byte_pair_encoding=True, we should not use "\s+" as seperator I guess df = pd.read_csv(data_path, - sep="\s+", + sep=separator,#"\s+", header=None, nrows=None if read_only_few is None else read_only_few, usecols=[0, 1, 2], @@ -171,8 +172,9 @@ def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: def read_from_disk(data_path: str, read_only_few: int = None, - sample_triples_ratio: float = None, backend=None): - assert backend + sample_triples_ratio: float = None, backend:str=None,separator:str=None): + assert backend is not None, "backend cannot be None" + assert separator is not None, f"separator cannot be None. Currently {separator}" # If path exits if glob.glob(data_path): # (1) Detect data format @@ -180,20 +182,15 @@ def read_from_disk(data_path: str, read_only_few: int = None, if dformat in ["ttl", "owl", "turtle", "rdf/xml"] and backend != "rdflib": raise RuntimeError( f"Data with **{dformat}** format cannot be read via --backend pandas or polars. Use --backend rdflib") - if backend == 'pandas': - return read_with_pandas(data_path, read_only_few, sample_triples_ratio) + return read_with_pandas(data_path, read_only_few, sample_triples_ratio, separator) elif backend == 'polars': - return read_with_polars(data_path, read_only_few, sample_triples_ratio) + return read_with_polars(data_path, read_only_few, sample_triples_ratio, separator) elif backend == "rdflib": # Lazy import from rdflib import Graph - - try: - assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"] - except AssertionError: - raise AssertionError(f"--backend {backend} and dataformat **{dformat}** is not matching. " - f"Use --backend pandas") + assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"],\ + f"--backend {backend} and dataformat **{dformat}** is not matching. Use --backend pandas" return pd.DataFrame(data=[(str(s), str(p), str(o)) for s, p, o in Graph().parse(data_path)], columns=['subject', 'relation', 'object'], dtype=str) else: diff --git a/dicee/sanity_checkers.py b/dicee/sanity_checkers.py index c36b900b..70dc94b2 100644 --- a/dicee/sanity_checkers.py +++ b/dicee/sanity_checkers.py @@ -80,20 +80,11 @@ def validate_knowledge_graph(args): def sanity_checking_with_arguments(args): - try: - assert args.embedding_dim > 0 - except AssertionError: - raise AssertionError(f'embedding_dim must be strictly positive. Currently:{args.embedding_dim}') - - if args.scoring_technique not in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll", "Pyke", "Sentence"]: - raise KeyError(f'Invalid training strategy => {args.scoring_technique}.') - - assert args.learning_rate > 0 + assert args.embedding_dim > 0,f"embedding_dim must be strictly positive. Currently:{args.embedding_dim}" + assert args.scoring_technique in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll","Pyke", "Sentence"], f"Invalid training strategy => {args.scoring_technique}." + assert args.learning_rate > 0, f"Learning rate must be greater than 0. Currently:{args.learning_rate}" if args.num_folds_for_cv is None: args.num_folds_for_cv = 0 - try: - assert args.num_folds_for_cv >= 0 - except AssertionError: - raise AssertionError(f'num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}') + assert args.num_folds_for_cv >= 0,f"num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}" validate_knowledge_graph(args) diff --git a/dicee/scripts/run.py b/dicee/scripts/run.py index fef0e5c7..0103535e 100755 --- a/dicee/scripts/run.py +++ b/dicee/scripts/run.py @@ -4,8 +4,6 @@ def get_default_arguments(description=None): """ Extends pytorch_lightning Trainer's arguments with ours """ - # From "pytorch-lightning==1.6.4" to "lightning>=2.1.3", 'Trainer' has no attribute 'add_argparse_args' - # parser = pl.Trainer.add_argparse_args(argparse.ArgumentParser(add_help=False)) parser = argparse.ArgumentParser(add_help=False) # Default Trainer param https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#methods # Knowledge graph related arguments @@ -14,19 +12,21 @@ def get_default_arguments(description=None): ",e.g., KGs/UMLS") parser.add_argument("--sparql_endpoint", type=str, default=None, help="An endpoint of a triple store, e.g. 'http://localhost:3030/mutagenesis/'. ") - parser.add_argument("--path_single_kg", type=str, default=None, + parser.add_argument("--path_single_kg", type=str, default=None,#"/home/cdemir/Desktop/Softwares/dice-embeddings/dice.nt", help="Path of a file corresponding to the input knowledge graph") # Saved files related arguments - parser.add_argument("--path_to_store_single_run", type=str, default=None, + parser.add_argument("--path_to_store_single_run", type=str, default=None,#"DBpedia", help="A single directory created that contains related data about embeddings.") parser.add_argument("--storage_path", type=str, default='Experiments', help="A directory named with time of execution under --storage_path " "that contains related data about embeddings.") parser.add_argument("--save_embeddings_as_csv", action="store_true", help="A flag for saving embeddings in csv file.") - parser.add_argument("--backend", type=str, default="polars", + parser.add_argument("--backend", type=str, default="pandas", choices=["pandas", "polars", "rdflib"], help='Backend for loading, preprocessing, indexing input knowledge graph.') + parser.add_argument("--separator", type=str, default="\s+", + help='Pandas \s+, t for \t polars works with the last two.') # Model related arguments parser.add_argument("--model", type=str, default="Keci", diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index bafe8cbf..80654138 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -355,22 +355,24 @@ def add_noisy_triples(train_set: pd.DataFrame, add_noise_rate: float) -> pd.Data assert num_triples + num_noisy_triples == len(train_set) return train_set - def read_or_load_kg(args, cls): print('*** Read or Load Knowledge Graph ***') start_time = time.time() kg = cls(dataset_dir=args.dataset_dir, byte_pair_encoding=args.byte_pair_encoding, + padding=True if args.byte_pair_encoding and args.model != "BytE" else False, add_noise_rate=args.add_noise_rate, sparql_endpoint=args.sparql_endpoint, path_single_kg=args.path_single_kg, - add_reciprical=args.apply_reciprical_or_noise, + add_reciprocal=args.apply_reciprical_or_noise, eval_model=args.eval_model, read_only_few=args.read_only_few, sample_triples_ratio=args.sample_triples_ratio, path_for_serialization=args.full_storage_path, path_for_deserialization=args.path_experiment_folder if hasattr(args, 'path_experiment_folder') else None, - backend=args.backend) + backend=args.backend, + training_technique=args.scoring_technique, + separator=args.separator) print(f'Preprocessing took: {time.time() - start_time:.3f} seconds') # (2) Share some info about data for easy access. print(kg.description_of_input) @@ -528,19 +530,14 @@ def create_experiment_folder(folder_name='Experiments'): def continual_training_setup_executor(executor) -> None: - """ - storage_path:str A path leading to a parent directory, where a subdirectory containing KGE related data - - full_storage_path:str A path leading to a subdirectory containing KGE related data - - """ + # TODO:CD:Deprecate it if executor.is_continual_training: # (4.1) If it is continual, then store new models on previous path. executor.storage_path = executor.args.full_storage_path else: # Create a single directory containing KGE and all related data if executor.args.path_to_store_single_run: - os.makedirs(executor.args.path_to_store_single_run, exist_ok=True) + os.makedirs(executor.args.path_to_store_single_run, exist_ok=False) executor.args.full_storage_path = executor.args.path_to_store_single_run else: # Create a parent and subdirectory. diff --git a/dicee/static_preprocess_funcs.py b/dicee/static_preprocess_funcs.py index 7d657eec..f7717a0a 100644 --- a/dicee/static_preprocess_funcs.py +++ b/dicee/static_preprocess_funcs.py @@ -39,9 +39,7 @@ def preprocesses_input_args(args): assert args.weight_decay >= 0.0 args.learning_rate = args.lr args.deterministic = True - assert args.init_param in ['xavier_normal', None] - # No need to eval. Investigate runtime performance args.check_val_every_n_epoch = 10 ** 6 # ,i.e., no eval assert args.add_noise_rate is None or isinstance(args.add_noise_rate, float) @@ -51,10 +49,8 @@ def preprocesses_input_args(args): 'train_val_test'] except AssertionError: raise AssertionError(f'Unexpected input for eval_model ***\t{args.eval_model}\t***') - if args.eval_model == 'None': args.eval_model = None - # reciprocal checking if args.scoring_technique in ["AllvsAll", "1vsSample", "KvsAll", "1vsAll", "KvsSample"]: args.apply_reciprical_or_noise = True @@ -62,7 +58,6 @@ def preprocesses_input_args(args): args.apply_reciprical_or_noise = False else: raise KeyError(f'Unexpected input for scoring_technique \t{args.scoring_technique}') - if args.sample_triples_ratio is not None: assert 1.0 >= args.sample_triples_ratio >= 0.0 assert args.backend in ["pandas", "polars", "rdflib"] diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index bc357cb3..78c78f43 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -1,6 +1,5 @@ import lightning as pl - -import gc +import polars from typing import Union from dicee.models.base_model import BaseKGE from dicee.static_funcs import select_model @@ -17,6 +16,13 @@ from ..knowledge_graph import KG import numpy as np + + + +def load_term_mapping(file_path=str): + return polars.read_csv(file_path + ".csv") + + def initialize_trainer(args, callbacks): if args.trainer == 'torchCPUTrainer': print('Initializing TorchTrainer CPU Trainer...', end='\t') @@ -164,7 +170,7 @@ def continual_start(self): model, form_of_labelling = self.initialize_or_load_model() assert form_of_labelling in ['EntityPrediction', 'RelationPrediction', 'Pyke'] assert self.args.scoring_technique in ["AllvsAll","KvsSample" ,"1vsSample", "1vsAll", "KvsAll", "NegSample"] - train_loader = self.initialize_dataloader( + train_loader = self.init_dataloader( reload_dataset(path=self.storage_path, form_of_labelling=form_of_labelling, scoring_technique=self.args.scoring_technique, neg_ratio=self.args.neg_ratio, @@ -186,7 +192,7 @@ def initialize_or_load_model(self): return model, form_of_labelling @timeit - def initialize_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.utils.data.DataLoader: + def init_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.utils.data.DataLoader: print('Initializing Dataloader...', end='\t') # https://pytorch.org/docs/stable/data.html#multi-process-data-loading # https://github.com/pytorch/pytorch/issues/13246#issuecomment-905703662 @@ -195,41 +201,57 @@ def initialize_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.util num_workers=self.args.num_core, persistent_workers=False) @timeit - def initialize_dataset(self, dataset: KG, form_of_labelling) -> torch.utils.data.Dataset: + def init_dataset(self) -> torch.utils.data.Dataset: print('Initializing Dataset...', end='\t') - train_set_shape=dataset.train_set.shape - train_set_dtype=dataset.train_set.dtype - - fp = np.memmap(dataset.path_for_serialization + '/memory_map_train_set.npy', dtype=train_set_dtype, mode='w+', shape=train_set_shape) - fp[:] = dataset.train_set[:] - dataset.train_set=fp - del fp - - - train_dataset = construct_dataset(train_set=dataset.train_set, - valid_set=dataset.valid_set, - test_set=dataset.test_set, - train_target_indices=dataset.train_target_indices, - target_dim=dataset.target_dim, - ordered_bpe_entities=dataset.ordered_bpe_entities, - entity_to_idx=dataset.entity_to_idx, - relation_to_idx=dataset.relation_to_idx, - form_of_labelling=form_of_labelling, - scoring_technique=self.args.scoring_technique, - neg_ratio=self.args.neg_ratio, - label_smoothing_rate=self.args.label_smoothing_rate, - byte_pair_encoding=self.args.byte_pair_encoding, - block_size=self.args.block_size) - # TODO: No need to keep the data in memory - if self.args.eval_model is None: - del dataset.train_set - gc.collect() + if isinstance(self.trainer.dataset,KG): + train_set_shape=self.trainer.dataset.train_set.shape + train_set_dtype=self.trainer.dataset.train_set.dtype + fp = np.memmap(self.trainer.dataset.path_for_serialization + '/memory_map_train_set.npy', dtype=train_set_dtype, mode='w+', shape=train_set_shape) + fp[:] = self.trainer.dataset.train_set[:] + self.trainer.dataset.train_set=fp + del fp + train_dataset = construct_dataset(train_set=self.trainer.dataset.train_set, + valid_set=self.trainer.dataset.valid_set, + test_set=self.trainer.dataset.test_set, + train_target_indices=self.trainer.dataset.train_target_indices, + target_dim=self.trainer.dataset.target_dim, + ordered_bpe_entities=self.trainer.dataset.ordered_bpe_entities, + entity_to_idx=self.trainer.dataset.entity_to_idx, + relation_to_idx=self.trainer.dataset.relation_to_idx, + form_of_labelling=self.trainer.form_of_labelling, + scoring_technique=self.args.scoring_technique, + neg_ratio=self.args.neg_ratio, + label_smoothing_rate=self.args.label_smoothing_rate, + byte_pair_encoding=self.args.byte_pair_encoding, + block_size=self.args.block_size) + else: + train_dataset = construct_dataset(train_set=self.trainer.dataset, + valid_set=None, + test_set=None, + train_target_indices=None,#self.trainer.dataset.train_target_indices, + target_dim=None,#self.trainer.dataset.target_dim, + ordered_bpe_entities=None,#self.trainer.dataset.ordered_bpe_entities, + entity_to_idx=load_term_mapping(file_path=self.args.path_to_store_single_run + "/entity_to_idx"),#self.trainer.dataset.entity_to_idx, + relation_to_idx=load_term_mapping(file_path=self.args.path_to_store_single_run + "/relation_to_idx"),#self.trainer.dataset.relation_to_idx, + form_of_labelling=self.trainer.form_of_labelling, + scoring_technique=self.args.scoring_technique, + neg_ratio=self.args.neg_ratio, + label_smoothing_rate=self.args.label_smoothing_rate, + byte_pair_encoding=self.args.byte_pair_encoding, + block_size=self.args.block_size) + + return train_dataset - def start(self, knowledge_graph: KG) -> Tuple[BaseKGE, str]: + def start(self, knowledge_graph: Union[KG,np.memmap]) -> Tuple[BaseKGE, str]: + """ + + in DDP setup, we need to load the memory map of already read/index KG. + Ther + """ """ Train selected model via the selected training strategy """ print('------------------- Train -------------------') - + assert isinstance(knowledge_graph, np.memmap) or isinstance(knowledge_graph, KG) if self.args.num_folds_for_cv == 0: # Initialize Trainer self.trainer: Union[TorchTrainer, TorchDDPTrainer, pl.Trainer] @@ -239,8 +261,7 @@ def start(self, knowledge_graph: KG) -> Tuple[BaseKGE, str]: self.trainer.evaluator = self.evaluator self.trainer.dataset = knowledge_graph self.trainer.form_of_labelling = form_of_labelling - self.trainer.fit(model, train_dataloaders=self.initialize_dataloader( - self.initialize_dataset(knowledge_graph, form_of_labelling))) + self.trainer.fit(model, train_dataloaders=self.init_dataloader(self.init_dataset())) return model, form_of_labelling else: return self.k_fold_cross_validation(knowledge_graph) @@ -278,7 +299,7 @@ def k_fold_cross_validation(self, dataset) -> Tuple[BaseKGE, str]: train_set_for_i_th_fold, test_set_for_i_th_fold = dataset.train_set[train_index], dataset.train_set[ test_index] - trainer.fit(model, train_dataloaders=self.initialize_dataloader( + trainer.fit(model, train_dataloaders=self.init_dataloader( construct_dataset(train_set=train_set_for_i_th_fold, entity_to_idx=dataset.entity_to_idx, relation_to_idx=dataset.relation_to_idx, diff --git a/dicee/trainer/torch_trainer_ddp.py b/dicee/trainer/torch_trainer_ddp.py index 4875a39b..26825af4 100644 --- a/dicee/trainer/torch_trainer_ddp.py +++ b/dicee/trainer/torch_trainer_ddp.py @@ -61,7 +61,6 @@ def fit(self, *args, **kwargs): collate_fn=kwargs['train_dataloaders'].dataset.collate_fn, sampler=torch.utils.data.distributed.DistributedSampler( train_dataset_loader.dataset)) - # (3) Start NodeTrainer. NodeTrainer(self, model, train_dataset_loader, self.callbacks, self.attributes.num_epochs).train() torch.distributed.destroy_process_group() @@ -89,6 +88,9 @@ def __init__(self, self.model = torch.nn.parallel.DistributedDataParallel(self.model, device_ids=[self.local_rank], output_device=self.local_rank) self.num_epochs = num_epochs self.loss_history = [] + # TODO: CD: This should be given as an input param + ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}["bfloat16"] + self.ctx = torch.amp.autocast(device_type="cuda",dtype=ptdtype) def _load_snapshot(self, snapshot_path): raise NotImplementedError @@ -108,9 +110,10 @@ def _run_batch(self, source: torch.LongTensor, targets: torch.FloatTensor): """ self.optimizer.zero_grad() - output = self.model(source) - loss = self.loss_func(output, targets) - batch_loss = loss.item() + with self.ctx: + output = self.model(source) + loss = self.loss_func(output, targets) + batch_loss = loss.item() loss.backward() self.optimizer.step() return batch_loss diff --git a/tests/test_continual_training.py b/tests/test_continual_training.py index c8c63fd5..5aca93e0 100644 --- a/tests/test_continual_training.py +++ b/tests/test_continual_training.py @@ -29,7 +29,7 @@ def test_negative_sampling(self): result = Execute(args).start() assert os.path.isdir(result['path_experiment_folder']) pre_trained_kge = KGE(path=result['path_experiment_folder']) - kg = KG(dataset_dir=args.dataset_dir) + kg = KG(dataset_dir=args.dataset_dir,separator="\s+") pre_trained_kge.train(kg, epoch=1, batch_size=args.batch_size) @pytest.mark.filterwarnings('ignore::UserWarning') @@ -54,5 +54,5 @@ def test_negative_sampling_Family(self): assert os.path.isdir(result['path_experiment_folder']) pre_trained_kge = KGE(path=result['path_experiment_folder']) kg = KG(args.dataset_dir, entity_to_idx=pre_trained_kge.entity_to_idx, - relation_to_idx=pre_trained_kge.relation_to_idx) + relation_to_idx=pre_trained_kge.relation_to_idx,separator="\s+") pre_trained_kge.train(kg, epoch=1, batch_size=args.batch_size) diff --git a/tests/test_different_backends.py b/tests/test_different_backends.py index 8b40bcc3..a68210ee 100644 --- a/tests/test_different_backends.py +++ b/tests/test_different_backends.py @@ -28,4 +28,5 @@ def test_pandas_as_backend(self): args = Namespace() args.dataset_dir = 'KGs/UMLS' args.backend = 'polars' + args.separator=" " Execute(args).start()