diff --git a/dicee/evaluator.py b/dicee/evaluator.py index 3721e1b7..b005f20a 100644 --- a/dicee/evaluator.py +++ b/dicee/evaluator.py @@ -2,6 +2,8 @@ import torch import numpy as np import json + +from . import load_json from .static_funcs import pickle from .static_funcs_training import evaluate_lp, evaluate_bpe_lp from typing import Tuple, List @@ -67,6 +69,7 @@ def vocab_preparation(self, dataset) -> None: # @timeit def eval(self, dataset: KG, trained_model, form_of_labelling, during_training=False) -> None: + assert isinstance(dataset, KG), "dataset must be KG" # @TODO: Why this reassigment ? self.during_training = during_training # (1) Exit, if the flag is not set @@ -140,6 +143,9 @@ def __load_and_set_mappings(self): self.er_vocab = pickle.load(open(self.args.full_storage_path + "/er_vocab.p", "rb")) self.re_vocab = pickle.load(open(self.args.full_storage_path + "/re_vocab.p", "rb")) self.ee_vocab = pickle.load(open(self.args.full_storage_path + "/ee_vocab.p", "rb")) + report = load_json(self.args.full_storage_path + "/report.json") + self.num_entities = report["num_entities"] + self.num_relations = report["num_relations"] def eval_rank_of_head_and_tail_entity(self, *, train_set, valid_set=None, test_set=None, trained_model): # 4. Test model on the training dataset if it is needed. @@ -426,11 +432,17 @@ def evaluate_lp_bpe_k_vs_all(self, model, triples: List[List[str]], info=None, f return results def evaluate_lp(self, model, triple_idx, info: str): - return evaluate_lp(model, triple_idx, num_entities=self.num_entities, er_vocab=self.er_vocab, + assert self.num_entities is not None, "self.num_entities cannot be None" + assert self.er_vocab is not None, "self.er_vocab cannot be None" + assert self.re_vocab is not None, "self.re_vocab cannot be None" + return evaluate_lp(model, triple_idx, + num_entities=self.num_entities, + er_vocab=self.er_vocab, re_vocab=self.re_vocab, info=info) def dummy_eval(self, trained_model, form_of_labelling: str): - + assert trained_model is not None + # @TODO:CD: Why such naming! We need to document it better. if self.is_continual_training: self.__load_and_set_mappings() @@ -448,6 +460,7 @@ def dummy_eval(self, trained_model, form_of_labelling: str): trained_model=trained_model, form_of_labelling=form_of_labelling) else: raise ValueError(f'Invalid argument: {self.args.scoring_technique}') + with open(self.args.full_storage_path + '/eval_report.json', 'w') as file_descriptor: json.dump(self.report, file_descriptor, indent=4) diff --git a/dicee/executer.py b/dicee/executer.py index 351c9798..ecc4bf0d 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -49,8 +49,11 @@ def __init__(self, args, continuous_training=False): def setup_executor(self) -> None: if self.is_continual_training is False: # Create a single directory containing KGE and all related data - if self.args.path_to_store_single_run: - os.makedirs(self.args.path_to_store_single_run, exist_ok=True) + if self.args.path_to_store_single_run is not None: + if os.path.exists(self.args.path_to_store_single_run): + print(f"Deleting the existing directory of {self.args.path_to_store_single_run}") + os.system(f'rm -rf {self.args.path_to_store_single_run}') + os.makedirs(self.args.path_to_store_single_run, exist_ok=False) self.args.full_storage_path = self.args.path_to_store_single_run else: self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path) @@ -59,35 +62,6 @@ def setup_executor(self) -> None: temp = vars(self.args) json.dump(temp, file_descriptor, indent=3) - def dept_read_preprocess_index_serialize_data(self) -> None: - """ Read & Preprocess & Index & Serialize Input Data - - (1) Read or load the data from disk into memory. - (2) Store the statistics of the data. - - Parameter - ---------- - - Return - ---------- - None - - """ - # (2) Store the stats and share parameters - self.args.num_entities = self.knowledge_graph.num_entities - self.args.num_relations = self.knowledge_graph.num_relations - self.args.num_tokens = self.knowledge_graph.num_tokens - self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens - self.args.ordered_bpe_entities=self.knowledge_graph.ordered_bpe_entities - self.report['num_train_triples'] = len(self.knowledge_graph.train_set) - self.report['num_entities'] = self.knowledge_graph.num_entities - self.report['num_relations'] = self.knowledge_graph.num_relations - self.report['num_relations'] = self.knowledge_graph.num_relations - self.report[ - 'max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None - - self.report['runtime_kg_loading'] = time.time() - self.start_time - @timeit def save_trained_model(self) -> None: """ Save a knowledge graph embedding model @@ -113,19 +87,17 @@ def save_trained_model(self) -> None: self.report.update(self.trained_model.mem_of_model()) # (3) Store/Serialize Model for further use. if self.is_continual_training is False: - store(trainer=self.trainer, - trained_model=self.trained_model, + store(trained_model=self.trained_model, model_name='model', - full_storage_path=self.args.full_storage_path,#self.storage_path, + full_storage_path=self.args.full_storage_path, save_embeddings_as_csv=self.args.save_embeddings_as_csv) else: - store(trainer=self.trainer, - trained_model=self.trained_model, - model_name='model_' + str(datetime.datetime.now()), - full_storage_path=self.args.full_storage_path,#self.storage_path, + store(trained_model=self.trained_model, + model_name='model', # + str(datetime.datetime.now()), + full_storage_path=self.args.full_storage_path, save_embeddings_as_csv=self.args.save_embeddings_as_csv) - self.report['path_experiment_folder'] = self.args.full_storage_path#self.storage_path + self.report['path_experiment_folder'] = self.args.full_storage_path self.report['num_entities'] = self.args.num_entities self.report['num_relations'] = self.args.num_relations @@ -154,7 +126,8 @@ def end(self, form_of_labelling: str) -> dict: self.write_report() return {**self.report} else: - self.evaluator.eval(dataset=self.knowledge_graph, trained_model=self.trained_model, + self.evaluator.eval(dataset=self.knowledge_graph, + trained_model=self.trained_model, form_of_labelling=form_of_labelling) self.write_report() return {**self.report, **self.evaluator.report} @@ -246,7 +219,7 @@ class ContinuousExecute(Execute): def __init__(self, args): # (1) Current input configuration. - assert os.path.exists(args.continual_learning) + assert os.path.exists(args.continual_learning), f"Path doesn't exist {args.continual_learning}" assert os.path.isfile(args.continual_learning + '/configuration.json') # (2) Load previous input configuration. previous_args = load_json(args.continual_learning + '/configuration.json') @@ -303,7 +276,6 @@ def continual_start(self) -> dict: self.args.ordered_bpe_entities = None self.trained_model, form_of_labelling = self.trainer.continual_start(knowledge_graph) - # (5) Store trained model. self.save_trained_model() # (6) Eval model. diff --git a/dicee/models/ensemble.py b/dicee/models/ensemble.py index 2cf4774e..b99022c0 100644 --- a/dicee/models/ensemble.py +++ b/dicee/models/ensemble.py @@ -1,22 +1,32 @@ import torch import copy - +from typing import List class EnsembleKGE: - def __init__(self, seed_model): - self.models = [] - self.optimizers = [] - self.loss_history = [] - for i in range(torch.cuda.device_count()): - i_model=copy.deepcopy(seed_model) - # TODO: Why we cant send the compile model to cpu ? - #i_model = torch.compile(i_model) - i_model.to(torch.device(f"cuda:{i}")) - self.optimizers.append(i_model.configure_optimizers()) - self.models.append(i_model) - # Maybe use the original model's name ? + def __init__(self, seed_model=None,pretrained_models:List=None): + + if seed_model is not None: + self.models = [] + self.optimizers = [] + self.loss_history = [] + for i in range(torch.cuda.device_count()): + i_model=copy.deepcopy(seed_model) + # TODO: Why we cant send the compile model to cpu ? + #i_model = torch.compile(i_model) + i_model.to(torch.device(f"cuda:{i}")) + self.optimizers.append(i_model.configure_optimizers()) + self.models.append(i_model) + else: + assert pretrained_models is not None + self.models = pretrained_models + self.optimizers = [] + self.loss_history = [] + + for i in range(torch.cuda.device_count()): + self.models[i].to(torch.device(f"cuda:{i}")) + self.optimizers.append(self.models[i].configure_optimizers()) + # Maybe use the original model's name ? self.name=self.models[0].name self.train_mode=True - def named_children(self): return self.models[0].named_children() @property diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index 7627fa13..8685c0cb 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -102,20 +102,40 @@ def select_model(args: dict, is_continual_training: bool = None, storage_path: s assert isinstance(is_continual_training, bool) assert isinstance(storage_path, str) if is_continual_training: - print('Loading pre-trained model...') - model, _ = intialize_model(args) - try: - weights = torch.load(storage_path + '/model.pt', torch.device('cpu')) - model.load_state_dict(weights) - for parameter in model.parameters(): - parameter.requires_grad = True - model.train() - except FileNotFoundError: - print(f"{storage_path}/model.pt is not found. The model will be trained with random weights") - return model, _ + # Check whether we have tensor parallelized KGE. + files_under_storage_path = [f for f in os.listdir(storage_path) if os.path.isfile(os.path.join(storage_path, f))] + num_of_partial_models_for_tensor_parallel= len([ i for i in files_under_storage_path if "partial" in i ]) + if num_of_partial_models_for_tensor_parallel >= 1: + models=[] + labelling_flag=None + for i in range(num_of_partial_models_for_tensor_parallel): + model, labelling_flag = intialize_model(args) + weights = torch.load(storage_path + f'/model_partial_{i}.pt', torch.device('cpu'),weights_only=False) + model.load_state_dict(weights) + for parameter in model.parameters(): + parameter.requires_grad = True + model.train() + models.append(model) + return EnsembleKGE(pretrained_models=models), labelling_flag + else: + print('Loading pre-trained model...') + model, labelling_flag = intialize_model(args) + try: + weights = torch.load(storage_path + '/model.pt', torch.device('cpu')) + model.load_state_dict(weights) + for parameter in model.parameters(): + parameter.requires_grad = True + model.train() + except FileNotFoundError as e: + print(f"{storage_path}/model.pt is not found. The model will be trained with random weights") + raise e + return model, labelling_flag else: - return intialize_model(args) + model, labelling_flag= intialize_model(args) + if args["trainer"]=="TP": + model=EnsembleKGE(seed_model=model) + return model, labelling_flag def load_model(path_of_experiment_folder: str, model_name='model.pt',verbose=0) -> Tuple[object, Tuple[dict, dict]]: """ Load weights and initialize pytorch module from namespace arguments""" @@ -279,13 +299,9 @@ def numpy_data_type_changer(train_set: np.ndarray, num: int) -> np.ndarray: def save_checkpoint_model(model, path: str) -> None: """ Store Pytorch model into disk""" if isinstance(model, BaseKGE): - try: - torch.save(model.state_dict(), path) - except ReferenceError as e: - print(e) - print(model.name) - print('Could not save the model correctly') + torch.save(model.state_dict(), path) elif isinstance(model, EnsembleKGE): + # path comes with ../model_... for i, partial_model in enumerate(model): new_path=path.replace("model.pt",f"model_partial_{i}.pt") torch.save(partial_model.state_dict(), new_path) @@ -293,18 +309,8 @@ def save_checkpoint_model(model, path: str) -> None: torch.save(model.model.state_dict(), path) -def store(trainer, - trained_model, model_name: str = 'model', full_storage_path: str = None, +def store(trained_model, model_name: str = 'model', full_storage_path: str = None, save_embeddings_as_csv=False) -> None: - """ - Store trained_model model and save embeddings into csv file. - :param trainer: an instance of trainer class - :param full_storage_path: path to save parameters. - :param model_name: string representation of the name of the model. - :param trained_model: an instance of BaseKGE see core.models.base_model . - :param save_embeddings_as_csv: for easy access of embeddings. - :return: - """ assert full_storage_path is not None assert isinstance(model_name, str) assert len(model_name) > 1 diff --git a/dicee/static_funcs_training.py b/dicee/static_funcs_training.py index 130ffc54..6b594e06 100644 --- a/dicee/static_funcs_training.py +++ b/dicee/static_funcs_training.py @@ -10,28 +10,18 @@ def make_iterable_verbose(iterable_object, verbose, desc="Default", position=Non return iterable_object @torch.no_grad() -def evaluate_lp(model, triple_idx, num_entities, er_vocab: Dict[Tuple, List], re_vocab: Dict[Tuple, List], +def evaluate_lp(model=None, triple_idx=None, num_entities=None, er_vocab: Dict[Tuple, List]=None, + re_vocab: Dict[Tuple, List]=None, info='Eval Starts', batch_size=128, chunk_size=1000): - """ - Evaluate model in a standard link prediction task - - for each triple - the rank is computed by taking the mean of the filtered missing head entity rank and - the filtered missing tail entity rank - :param model: - :param triple_idx: - :param num_entities: - :param er_vocab: - :param re_vocab: - :param info: - :param batch_size: - :param chunk_size: - :return: - """ + assert model is not None, "Model must be provided" + assert triple_idx is not None, "triple_idx must be provided" + assert num_entities is not None, "num_entities must be provided" + assert er_vocab is not None, "er_vocab must be provided" + assert re_vocab is not None, "re_vocab must be provided" + model.eval() print(info) print(f'Num of triples {len(triple_idx)}') - print('** Evaluation with batching') hits = dict() reciprocal_ranks = [] # Iterate over test triples diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index 3a764aba..2808e48a 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -136,7 +136,7 @@ class DICE_Trainer: report:dict """ - def __init__(self, args, is_continual_training, storage_path, evaluator=None): + def __init__(self, args, is_continual_training:bool, storage_path, evaluator=None): self.report = dict() self.args = args self.trainer = None @@ -170,7 +170,6 @@ def continual_start(self,knowledge_graph): self.trainer = self.initialize_trainer(callbacks=get_callbacks(self.args)) model, form_of_labelling = self.initialize_or_load_model() # TODO: Here we need to load memory pag - self.trainer.evaluator = self.evaluator self.trainer.dataset = knowledge_graph self.trainer.form_of_labelling = form_of_labelling @@ -230,6 +229,8 @@ def init_dataset(self) -> torch.utils.data.Dataset: byte_pair_encoding=self.args.byte_pair_encoding, block_size=self.args.block_size) else: + assert isinstance(self.trainer.dataset, np.memmap), ("Train dataset must be an instance of memmap. " + f"Currently, {type(np.memmap)}!") if self.args.continual_learning: path = self.args.continual_learning else: @@ -276,6 +277,8 @@ def start(self, knowledge_graph: Union[KG,np.memmap]) -> Tuple[BaseKGE, str]: # TODO: Later, maybe we should write a callback to save the models in disk if isinstance(self.trainer, TensorParallel): + assert isinstance(model, EnsembleKGE), type(model) + model = self.trainer.fit(model, train_dataloaders=self.init_dataloader(self.init_dataset())) assert isinstance(model,EnsembleKGE) else: diff --git a/dicee/trainer/model_parallelism.py b/dicee/trainer/model_parallelism.py index cb78b24d..a900bfd7 100644 --- a/dicee/trainer/model_parallelism.py +++ b/dicee/trainer/model_parallelism.py @@ -134,17 +134,14 @@ def forward_backward_update_loss(z:Tuple, ensemble_model)->float: class TensorParallel(AbstractTrainer): def __init__(self, args, callbacks): super().__init__(args, callbacks) - self.models=[] - def get_ensemble(self): - return self.models def fit(self, *args, **kwargs): """ Train model """ assert len(args) == 1 - seed_model, = args - # () Init. ensemble model. - ensemble_model = EnsembleKGE(seed_model) + ensemble_model, = args + assert isinstance(ensemble_model,EnsembleKGE), (f"Selected model must " + f"be an instance of EnsembleKGE{type(ensemble_model)}") # () Run on_fit_start callbacks. self.on_fit_start(self, ensemble_model) # () Sanity checking diff --git a/tests/test_regression_model_paralelisim.py b/tests/test_regression_model_paralelisim.py index 2d7e3788..82c0a247 100644 --- a/tests/test_regression_model_paralelisim.py +++ b/tests/test_regression_model_paralelisim.py @@ -8,22 +8,22 @@ class TestRegressionTensorParallel: @pytest.mark.filterwarnings('ignore::UserWarning') def test_k_vs_all(self): if torch.cuda.is_available(): + os.system(f'rm -rf Keci_UMLS') args = Namespace() args.model = 'Keci' args.trainer = "TP" args.scoring_technique = "KvsAll" # 1vsAll, or AllvsAll, or NegSample args.dataset_dir = "KGs/UMLS" - # CD: TP currently doesn't work with path_to_store_single_run and eval. - #args.path_to_store_single_run = "Keci_UMLS" + args.path_to_store_single_run = "Keci_UMLS" + args.save_embeddings_as_csv=True args.optim="Adopt" - args.num_epochs = 10 + args.num_epochs = 100 args.embedding_dim = 32 args.batch_size = 32 args.lr=0.1 reports = Execute(args).start() assert reports["Train"]["MRR"] >= 0.60 assert reports["Test"]["MRR"] >= 0.58 - #assert os.path.exists("Keci_UMLS/entity_embeddings.csv") - #assert os.path.exists("Keci_UMLS/relation_embeddings.csv") - - #os.system(f'rm -rf Keci_UMLS') + assert os.path.exists("Keci_UMLS/Keci_entity_embeddings.csv") + assert os.path.exists("Keci_UMLS/Keci_relation_embeddings.csv") + os.system(f'rm -rf Keci_UMLS')