From 37f06f6f6a48384916669dfde62bd64c55846468 Mon Sep 17 00:00:00 2001 From: Caglar Demir Date: Wed, 23 Oct 2024 13:41:31 +0200 Subject: [PATCH 1/5] WIP: MultiGPU training with memory map --- dicee/dataset_classes.py | 7 +- dicee/executer.py | 78 +++++++++++++++------- dicee/knowledge_graph.py | 12 ++-- dicee/read_preprocess_save_load_kg/util.py | 2 +- dicee/sanity_checkers.py | 17 ++--- dicee/static_funcs.py | 16 ++--- dicee/static_preprocess_funcs.py | 5 -- dicee/trainer/dice_trainer.py | 68 ++++++++++--------- 8 files changed, 108 insertions(+), 97 deletions(-) diff --git a/dicee/dataset_classes.py b/dicee/dataset_classes.py index 33733d50..30729aa6 100644 --- a/dicee/dataset_classes.py +++ b/dicee/dataset_classes.py @@ -244,9 +244,7 @@ class OnevsAllDataset(torch.utils.data.Dataset): def __init__(self, train_set_idx: np.ndarray, entity_idxs): super().__init__() - assert isinstance(train_set_idx, np.memmap) - - assert isinstance(train_set_idx, np.ndarray) + assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray) assert len(train_set_idx) > 0 self.train_data = train_set_idx self.target_dim = len(entity_idxs) @@ -300,8 +298,7 @@ def __init__(self, train_set_idx: np.ndarray, entity_idxs, relation_idxs, form, label_smoothing_rate: float = 0.0): super().__init__() assert len(train_set_idx) > 0 - assert isinstance(train_set_idx, np.memmap) - assert isinstance(train_set_idx, np.ndarray) + assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray) self.train_data = None self.train_target = None self.label_smoothing_rate = torch.tensor(label_smoothing_rate) diff --git a/dicee/executer.py b/dicee/executer.py index 22d8acd5..eb8b18bb 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -6,13 +6,12 @@ import os import datetime from pytorch_lightning import seed_everything - -from dicee.knowledge_graph import KG -from dicee.evaluator import Evaluator +from .knowledge_graph import KG +from .evaluator import Evaluator # Avoid -from dicee.static_preprocess_funcs import preprocesses_input_args -from dicee.trainer import DICE_Trainer -from dicee.static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store +from .static_preprocess_funcs import preprocesses_input_args +from .trainer import DICE_Trainer +from .static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store,create_experiment_folder logging.getLogger('pytorch_lightning').setLevel(0) warnings.filterwarnings(action="ignore", category=DeprecationWarning) @@ -34,7 +33,7 @@ def __init__(self, args, continuous_training=False): # (3) Set the continual training flag self.is_continual_training = continuous_training # (4) Create an experiment folder or use the previous one - continual_training_setup_executor(self) + self.continual_training_setup_executor() # (5) A variable is initialized for pytorch lightning trainer or DICE_Trainer() self.trainer = None self.trained_model = None @@ -46,8 +45,36 @@ def __init__(self, args, continuous_training=False): self.evaluator = None # e.g. Evaluator(self) # (9) Execution start time self.start_time = None + self.multi_gpu_ddp_training_mode=False - def read_or_load_kg(self): + def continual_training_setup_executor(self) -> None: + if self.is_continual_training: + # () We continue the training, then we store new models on previous path. + self.storage_path = self.args.full_storage_path + else: + # Create a single directory containing KGE and all related data + if self.args.path_to_store_single_run: + # () If the folder exist, we can assume that, memory map is already available, + # Therefore, we can skip the part of reading the KG again + if os.path.isdir(self.args.path_to_store_single_run): + # if the path exist, then we assume that we in the multi-GPU training mode + # Therefore, we would like to skip the reading and preprocessing an input data simulatnouly + # for each evailable GPU (e.g. read and index input KG # GPU-1 times simultanously) + self.multi_gpu_ddp_training_mode=True + else: + os.makedirs(self.args.path_to_store_single_run, exist_ok=False) + self.multi_gpu_ddp_training_mode=False + + self.args.full_storage_path = self.args.path_to_store_single_run + else: + # Create a parent and subdirectory. + self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path) + self.storage_path = self.args.full_storage_path + with open(self.args.full_storage_path + '/configuration.json', 'w') as file_descriptor: + temp = vars(self.args) + json.dump(temp, file_descriptor, indent=3) + + def dept_read_or_load_kg(self): print('*** Read or Load Knowledge Graph ***') start_time = time.time() kg = KG(dataset_dir=self.args.dataset_dir, @@ -84,9 +111,6 @@ def read_preprocess_index_serialize_data(self) -> None: None """ - # (1) Read & Preprocess & Index & Serialize Input Data. - self.knowledge_graph = self.read_or_load_kg() - # (2) Store the stats and share parameters self.args.num_entities = self.knowledge_graph.num_entities self.args.num_relations = self.knowledge_graph.num_relations @@ -102,19 +126,6 @@ def read_preprocess_index_serialize_data(self) -> None: self.report['runtime_kg_loading'] = time.time() - self.start_time - def load_indexed_data(self) -> None: - """ Load the indexed data from disk into memory - - Parameter - ---------- - - Return - ---------- - None - - """ - self.knowledge_graph = read_or_load_kg(self.args, cls=KG) - @timeit def save_trained_model(self) -> None: """ Save a knowledge graph embedding model @@ -215,7 +226,24 @@ def start(self) -> dict: print(f"Start time:{datetime.datetime.now()}") # (1) Loading the Data # Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute - self.load_indexed_data() if self.is_continual_training else self.read_preprocess_index_serialize_data() + if self.multi_gpu_ddp_training_mode is False: + self.knowledge_graph = read_or_load_kg(self.args, cls=KG) + if self.is_continual_training is False: + self.args.num_entities = self.knowledge_graph.num_entities + self.args.num_relations = self.knowledge_graph.num_relations + self.args.num_tokens = self.knowledge_graph.num_tokens + self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens + self.args.ordered_bpe_entities = self.knowledge_graph.ordered_bpe_entities + self.report['num_train_triples'] = len(self.knowledge_graph.train_set) + self.report['num_entities'] = self.knowledge_graph.num_entities + self.report['num_relations'] = self.knowledge_graph.num_relations + self.report['num_relations'] = self.knowledge_graph.num_relations + self.report[ + 'max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None + self.report['runtime_kg_loading'] = time.time() - self.start_time + else: + # TODO:Ideally we only need to load the memory map. + self.knowledge_graph=self.args.full_storage_path # (2) Create an evaluator object. self.evaluator = Evaluator(args=self.args) # (3) Create a trainer object. diff --git a/dicee/knowledge_graph.py b/dicee/knowledge_graph.py index a21586ab..176c44aa 100644 --- a/dicee/knowledge_graph.py +++ b/dicee/knowledge_graph.py @@ -36,13 +36,14 @@ def __init__(self, dataset_dir: str = None, :param training_technique """ self.dataset_dir = dataset_dir + self.sparql_endpoint = sparql_endpoint + self.path_single_kg = path_single_kg + self.byte_pair_encoding = byte_pair_encoding self.ordered_shaped_bpe_tokens = None - self.sparql_endpoint = sparql_endpoint self.add_noise_rate = add_noise_rate self.num_entities = None self.num_relations = None - self.path_single_kg = path_single_kg self.path_for_deserialization = path_for_deserialization self.add_reciprocal = add_reciprocal self.eval_model = eval_model @@ -83,8 +84,7 @@ def __init__(self, dataset_dir: str = None, else: LoadSaveToDisk(kg=self).load() assert len(self.train_set) > 0, "Training set is empty" - self._describe() - + self.describe() if self.entity_to_idx is not None: assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx, pl.DataFrame), f"entity_to_idx must be a dict or a polars DataFrame: {type(self.entity_to_idx)}" @@ -96,8 +96,8 @@ def __init__(self, dataset_dir: str = None, print(f"No inverse mapping created as self.entity_to_idx is not a type of dictionary but {type(self.entity_to_idx)}\n" f"Backend might be selected as polars") - def _describe(self) -> None: - self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir} -------------------' + def describe(self) -> None: + self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir if isinstance(self.dataset_dir, str) else self.sparql_endpoint if isinstance(self.sparql_endpoint, str) else self.path_single_kg} -------------------' if self.byte_pair_encoding: self.description_of_input += f'\nNumber of tokens:{self.num_tokens}' \ f'\nNumber of max sequence of sub-words: {self.max_length_subword_tokens}' \ diff --git a/dicee/read_preprocess_save_load_kg/util.py b/dicee/read_preprocess_save_load_kg/util.py index 3661915c..a9c42592 100644 --- a/dicee/read_preprocess_save_load_kg/util.py +++ b/dicee/read_preprocess_save_load_kg/util.py @@ -120,7 +120,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: columns=[0, 1, 2], dtypes=[polars.String], new_columns=['subject', 'relation', 'object'], - separator=" ") # \s+ doesn't work for polars + separator="\t") # \s+ doesn't work for polars # (2) Sample from (1). if sample_triples_ratio: print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...') diff --git a/dicee/sanity_checkers.py b/dicee/sanity_checkers.py index c36b900b..70dc94b2 100644 --- a/dicee/sanity_checkers.py +++ b/dicee/sanity_checkers.py @@ -80,20 +80,11 @@ def validate_knowledge_graph(args): def sanity_checking_with_arguments(args): - try: - assert args.embedding_dim > 0 - except AssertionError: - raise AssertionError(f'embedding_dim must be strictly positive. Currently:{args.embedding_dim}') - - if args.scoring_technique not in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll", "Pyke", "Sentence"]: - raise KeyError(f'Invalid training strategy => {args.scoring_technique}.') - - assert args.learning_rate > 0 + assert args.embedding_dim > 0,f"embedding_dim must be strictly positive. Currently:{args.embedding_dim}" + assert args.scoring_technique in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll","Pyke", "Sentence"], f"Invalid training strategy => {args.scoring_technique}." + assert args.learning_rate > 0, f"Learning rate must be greater than 0. Currently:{args.learning_rate}" if args.num_folds_for_cv is None: args.num_folds_for_cv = 0 - try: - assert args.num_folds_for_cv >= 0 - except AssertionError: - raise AssertionError(f'num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}') + assert args.num_folds_for_cv >= 0,f"num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}" validate_knowledge_graph(args) diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index bafe8cbf..15d3636a 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -355,22 +355,23 @@ def add_noisy_triples(train_set: pd.DataFrame, add_noise_rate: float) -> pd.Data assert num_triples + num_noisy_triples == len(train_set) return train_set - def read_or_load_kg(args, cls): print('*** Read or Load Knowledge Graph ***') start_time = time.time() kg = cls(dataset_dir=args.dataset_dir, byte_pair_encoding=args.byte_pair_encoding, + padding=True if args.byte_pair_encoding and args.model != "BytE" else False, add_noise_rate=args.add_noise_rate, sparql_endpoint=args.sparql_endpoint, path_single_kg=args.path_single_kg, - add_reciprical=args.apply_reciprical_or_noise, + add_reciprocal=args.apply_reciprical_or_noise, eval_model=args.eval_model, read_only_few=args.read_only_few, sample_triples_ratio=args.sample_triples_ratio, path_for_serialization=args.full_storage_path, path_for_deserialization=args.path_experiment_folder if hasattr(args, 'path_experiment_folder') else None, - backend=args.backend) + backend=args.backend, + training_technique=args.scoring_technique) print(f'Preprocessing took: {time.time() - start_time:.3f} seconds') # (2) Share some info about data for easy access. print(kg.description_of_input) @@ -528,19 +529,14 @@ def create_experiment_folder(folder_name='Experiments'): def continual_training_setup_executor(executor) -> None: - """ - storage_path:str A path leading to a parent directory, where a subdirectory containing KGE related data - - full_storage_path:str A path leading to a subdirectory containing KGE related data - - """ + # TODO:CD:Deprecate it if executor.is_continual_training: # (4.1) If it is continual, then store new models on previous path. executor.storage_path = executor.args.full_storage_path else: # Create a single directory containing KGE and all related data if executor.args.path_to_store_single_run: - os.makedirs(executor.args.path_to_store_single_run, exist_ok=True) + os.makedirs(executor.args.path_to_store_single_run, exist_ok=False) executor.args.full_storage_path = executor.args.path_to_store_single_run else: # Create a parent and subdirectory. diff --git a/dicee/static_preprocess_funcs.py b/dicee/static_preprocess_funcs.py index 7d657eec..f7717a0a 100644 --- a/dicee/static_preprocess_funcs.py +++ b/dicee/static_preprocess_funcs.py @@ -39,9 +39,7 @@ def preprocesses_input_args(args): assert args.weight_decay >= 0.0 args.learning_rate = args.lr args.deterministic = True - assert args.init_param in ['xavier_normal', None] - # No need to eval. Investigate runtime performance args.check_val_every_n_epoch = 10 ** 6 # ,i.e., no eval assert args.add_noise_rate is None or isinstance(args.add_noise_rate, float) @@ -51,10 +49,8 @@ def preprocesses_input_args(args): 'train_val_test'] except AssertionError: raise AssertionError(f'Unexpected input for eval_model ***\t{args.eval_model}\t***') - if args.eval_model == 'None': args.eval_model = None - # reciprocal checking if args.scoring_technique in ["AllvsAll", "1vsSample", "KvsAll", "1vsAll", "KvsSample"]: args.apply_reciprical_or_noise = True @@ -62,7 +58,6 @@ def preprocesses_input_args(args): args.apply_reciprical_or_noise = False else: raise KeyError(f'Unexpected input for scoring_technique \t{args.scoring_technique}') - if args.sample_triples_ratio is not None: assert 1.0 >= args.sample_triples_ratio >= 0.0 assert args.backend in ["pandas", "polars", "rdflib"] diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index bc357cb3..38598512 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -186,7 +186,7 @@ def initialize_or_load_model(self): return model, form_of_labelling @timeit - def initialize_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.utils.data.DataLoader: + def init_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.utils.data.DataLoader: print('Initializing Dataloader...', end='\t') # https://pytorch.org/docs/stable/data.html#multi-process-data-loading # https://github.com/pytorch/pytorch/issues/13246#issuecomment-905703662 @@ -195,41 +195,46 @@ def initialize_dataloader(self, dataset: torch.utils.data.Dataset) -> torch.util num_workers=self.args.num_core, persistent_workers=False) @timeit - def initialize_dataset(self, dataset: KG, form_of_labelling) -> torch.utils.data.Dataset: + def init_dataset(self) -> torch.utils.data.Dataset: print('Initializing Dataset...', end='\t') - train_set_shape=dataset.train_set.shape - train_set_dtype=dataset.train_set.dtype - - fp = np.memmap(dataset.path_for_serialization + '/memory_map_train_set.npy', dtype=train_set_dtype, mode='w+', shape=train_set_shape) - fp[:] = dataset.train_set[:] - dataset.train_set=fp - del fp - + if isinstance(self.trainer.dataset,KG): + train_set_shape=self.trainer.dataset.train_set.shape + train_set_dtype=self.trainer.dataset.train_set.dtype + fp = np.memmap(self.trainer.dataset.path_for_serialization + '/memory_map_train_set.npy', dtype=train_set_dtype, mode='w+', shape=train_set_shape) + fp[:] = self.trainer.dataset.train_set[:] + self.trainer.dataset.train_set=fp + del fp + train_dataset = construct_dataset(train_set=self.trainer.dataset.train_set, + valid_set=self.trainer.dataset.valid_set, + test_set=self.trainer.dataset.test_set, + train_target_indices=self.trainer.dataset.train_target_indices, + target_dim=self.trainer.dataset.target_dim, + ordered_bpe_entities=self.trainer.dataset.ordered_bpe_entities, + entity_to_idx=self.trainer.dataset.entity_to_idx, + relation_to_idx=self.trainer.dataset.relation_to_idx, + form_of_labelling=self.trainer.form_of_labelling, + scoring_technique=self.args.scoring_technique, + neg_ratio=self.args.neg_ratio, + label_smoothing_rate=self.args.label_smoothing_rate, + byte_pair_encoding=self.args.byte_pair_encoding, + block_size=self.args.block_size) + # TODO: No need to keep the data in memory + if self.args.eval_model is None: + del dataset.train_set + gc.collect() + else: + raise NotImplementedError("We need to select the data type and shape!") - train_dataset = construct_dataset(train_set=dataset.train_set, - valid_set=dataset.valid_set, - test_set=dataset.test_set, - train_target_indices=dataset.train_target_indices, - target_dim=dataset.target_dim, - ordered_bpe_entities=dataset.ordered_bpe_entities, - entity_to_idx=dataset.entity_to_idx, - relation_to_idx=dataset.relation_to_idx, - form_of_labelling=form_of_labelling, - scoring_technique=self.args.scoring_technique, - neg_ratio=self.args.neg_ratio, - label_smoothing_rate=self.args.label_smoothing_rate, - byte_pair_encoding=self.args.byte_pair_encoding, - block_size=self.args.block_size) - # TODO: No need to keep the data in memory - if self.args.eval_model is None: - del dataset.train_set - gc.collect() return train_dataset - def start(self, knowledge_graph: KG) -> Tuple[BaseKGE, str]: + def start(self, knowledge_graph: Union[KG,str]) -> Tuple[BaseKGE, str]: + """ + + in DDP setup, we need to load the memory map of already read/index KG. + Ther + """ """ Train selected model via the selected training strategy """ print('------------------- Train -------------------') - if self.args.num_folds_for_cv == 0: # Initialize Trainer self.trainer: Union[TorchTrainer, TorchDDPTrainer, pl.Trainer] @@ -239,8 +244,7 @@ def start(self, knowledge_graph: KG) -> Tuple[BaseKGE, str]: self.trainer.evaluator = self.evaluator self.trainer.dataset = knowledge_graph self.trainer.form_of_labelling = form_of_labelling - self.trainer.fit(model, train_dataloaders=self.initialize_dataloader( - self.initialize_dataset(knowledge_graph, form_of_labelling))) + self.trainer.fit(model, train_dataloaders=self.init_dataloader(self.init_dataset())) return model, form_of_labelling else: return self.k_fold_cross_validation(knowledge_graph) From 2ffb4c94166209436863a8b9ad14a97c2e18c54f Mon Sep 17 00:00:00 2001 From: Caglar Demir Date: Wed, 23 Oct 2024 14:35:31 +0000 Subject: [PATCH 2/5] WIP: MultiGPUs training --- dicee/executer.py | 45 +++++++++++++--------- dicee/read_preprocess_save_load_kg/util.py | 2 +- dicee/trainer/dice_trainer.py | 28 ++++++++++++-- 3 files changed, 52 insertions(+), 23 deletions(-) diff --git a/dicee/executer.py b/dicee/executer.py index eb8b18bb..3512b501 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -12,6 +12,7 @@ from .static_preprocess_funcs import preprocesses_input_args from .trainer import DICE_Trainer from .static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store,create_experiment_folder +import numpy as np logging.getLogger('pytorch_lightning').setLevel(0) warnings.filterwarnings(action="ignore", category=DeprecationWarning) @@ -54,17 +55,7 @@ def continual_training_setup_executor(self) -> None: else: # Create a single directory containing KGE and all related data if self.args.path_to_store_single_run: - # () If the folder exist, we can assume that, memory map is already available, - # Therefore, we can skip the part of reading the KG again - if os.path.isdir(self.args.path_to_store_single_run): - # if the path exist, then we assume that we in the multi-GPU training mode - # Therefore, we would like to skip the reading and preprocessing an input data simulatnouly - # for each evailable GPU (e.g. read and index input KG # GPU-1 times simultanously) - self.multi_gpu_ddp_training_mode=True - else: - os.makedirs(self.args.path_to_store_single_run, exist_ok=False) - self.multi_gpu_ddp_training_mode=False - + os.makedirs(self.args.path_to_store_single_run, exist_ok=True) self.args.full_storage_path = self.args.path_to_store_single_run else: # Create a parent and subdirectory. @@ -226,7 +217,20 @@ def start(self) -> dict: print(f"Start time:{datetime.datetime.now()}") # (1) Loading the Data # Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute - if self.multi_gpu_ddp_training_mode is False: + if os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"): + # Read the JSON file + with open(self.args.path_to_store_single_run+'/memory_map_details.json', 'r') as file_descriptor: + memory_map_details = json.load(file_descriptor) + self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy', dtype=memory_map_details["dtype"], mode='r', shape=tuple(memory_map_details["shape"])) + print(self.knowledge_graph[:10]) + + self.args.num_entities = memory_map_details["num_entities"] + self.args.num_relations = memory_map_details["num_relations"] + self.args.num_tokens = None + self.args.max_length_subword_tokens = None + self.args.ordered_bpe_entities = None + + else: self.knowledge_graph = read_or_load_kg(self.args, cls=KG) if self.is_continual_training is False: self.args.num_entities = self.knowledge_graph.num_entities @@ -237,13 +241,16 @@ def start(self) -> dict: self.report['num_train_triples'] = len(self.knowledge_graph.train_set) self.report['num_entities'] = self.knowledge_graph.num_entities self.report['num_relations'] = self.knowledge_graph.num_relations - self.report['num_relations'] = self.knowledge_graph.num_relations - self.report[ - 'max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None + self.report['max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None self.report['runtime_kg_loading'] = time.time() - self.start_time - else: - # TODO:Ideally we only need to load the memory map. - self.knowledge_graph=self.args.full_storage_path + + data={"shape":tuple(self.knowledge_graph.train_set.shape),"dtype":self.knowledge_graph.train_set.dtype.str, + "num_entities":self.knowledge_graph.num_entities,"num_relations":self.knowledge_graph.num_relations} + with open(self.args.full_storage_path + '/memory_map_details.json', 'w') as file_descriptor: + json.dump(data, file_descriptor, indent=4) + print("DEEEEMIRRR\n") + + print(self.knowledge_graph.train_set[:10]) # (2) Create an evaluator object. self.evaluator = Evaluator(args=self.args) # (3) Create a trainer object. @@ -320,4 +327,4 @@ def continual_start(self) -> dict: else: self.evaluator = Evaluator(args=self.args, is_continual_training=True) self.evaluator.dummy_eval(self.trained_model, form_of_labelling) - return {**self.report, **self.evaluator.report} \ No newline at end of file + return {**self.report, **self.evaluator.report} diff --git a/dicee/read_preprocess_save_load_kg/util.py b/dicee/read_preprocess_save_load_kg/util.py index a9c42592..3661915c 100644 --- a/dicee/read_preprocess_save_load_kg/util.py +++ b/dicee/read_preprocess_save_load_kg/util.py @@ -120,7 +120,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: columns=[0, 1, 2], dtypes=[polars.String], new_columns=['subject', 'relation', 'object'], - separator="\t") # \s+ doesn't work for polars + separator=" ") # \s+ doesn't work for polars # (2) Sample from (1). if sample_triples_ratio: print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...') diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index 38598512..b451ddf9 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -1,5 +1,5 @@ import lightning as pl - +import polars import gc from typing import Union from dicee.models.base_model import BaseKGE @@ -17,6 +17,13 @@ from ..knowledge_graph import KG import numpy as np + + + +def load_term_mapping(file_path=str): + return polars.read_csv(file_path + ".csv") + + def initialize_trainer(args, callbacks): if args.trainer == 'torchCPUTrainer': print('Initializing TorchTrainer CPU Trainer...', end='\t') @@ -223,11 +230,25 @@ def init_dataset(self) -> torch.utils.data.Dataset: del dataset.train_set gc.collect() else: - raise NotImplementedError("We need to select the data type and shape!") + train_dataset = construct_dataset(train_set=self.trainer.dataset, + valid_set=None, + test_set=None, + train_target_indices=None,#self.trainer.dataset.train_target_indices, + target_dim=None,#self.trainer.dataset.target_dim, + ordered_bpe_entities=None,#self.trainer.dataset.ordered_bpe_entities, + entity_to_idx=load_term_mapping(file_path=self.args.path_to_store_single_run + "/entity_to_idx"),#self.trainer.dataset.entity_to_idx, + relation_to_idx=load_term_mapping(file_path=self.args.path_to_store_single_run + "/relation_to_idx"),#self.trainer.dataset.relation_to_idx, + form_of_labelling=self.trainer.form_of_labelling, + scoring_technique=self.args.scoring_technique, + neg_ratio=self.args.neg_ratio, + label_smoothing_rate=self.args.label_smoothing_rate, + byte_pair_encoding=self.args.byte_pair_encoding, + block_size=self.args.block_size) + return train_dataset - def start(self, knowledge_graph: Union[KG,str]) -> Tuple[BaseKGE, str]: + def start(self, knowledge_graph: Union[KG,np.memmap]) -> Tuple[BaseKGE, str]: """ in DDP setup, we need to load the memory map of already read/index KG. @@ -235,6 +256,7 @@ def start(self, knowledge_graph: Union[KG,str]) -> Tuple[BaseKGE, str]: """ """ Train selected model via the selected training strategy """ print('------------------- Train -------------------') + assert isinstance(knowledge_graph, np.memmap) or isinstance(knowledge_graph, KG) if self.args.num_folds_for_cv == 0: # Initialize Trainer self.trainer: Union[TorchTrainer, TorchDDPTrainer, pl.Trainer] From 6d84e19ca5008890ee377ea61b325475248240d7 Mon Sep 17 00:00:00 2001 From: Caglar Demir Date: Wed, 23 Oct 2024 21:24:53 +0200 Subject: [PATCH 3/5] WIP: multi-gpu memory map --- dicee/executer.py | 33 ++++++++++--------- dicee/knowledge_graph.py | 4 ++- .../read_from_disk.py | 9 ++--- dicee/read_preprocess_save_load_kg/util.py | 29 ++++++++-------- dicee/scripts/run.py | 10 +++--- dicee/static_funcs.py | 3 +- dicee/trainer/dice_trainer.py | 6 +--- dicee/trainer/torch_trainer_ddp.py | 11 ++++--- 8 files changed, 53 insertions(+), 52 deletions(-) diff --git a/dicee/executer.py b/dicee/executer.py index 3512b501..8c4f0aa1 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -11,7 +11,7 @@ # Avoid from .static_preprocess_funcs import preprocesses_input_args from .trainer import DICE_Trainer -from .static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store,create_experiment_folder +from .static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store, create_experiment_folder import numpy as np logging.getLogger('pytorch_lightning').setLevel(0) @@ -33,6 +33,8 @@ def __init__(self, args, continuous_training=False): seed_everything(args.random_seed, workers=True) # (3) Set the continual training flag self.is_continual_training = continuous_training + # TODO: self.storage_path vs self.args.full_storage_path. One of them is redudandant. + self.storage_path=None # (4) Create an experiment folder or use the previous one self.continual_training_setup_executor() # (5) A variable is initialized for pytorch lightning trainer or DICE_Trainer() @@ -42,11 +44,10 @@ def __init__(self, args, continuous_training=False): self.knowledge_graph = None # (7) Store few data in memory for numerical results, e.g. runtime, H@1 etc. self.report = dict() - # (8) Create an object to carry out link prediction evaluations - self.evaluator = None # e.g. Evaluator(self) + # (8) Create an object to carry out link prediction evaluations, e.g. Evaluator(self) + self.evaluator = None # (9) Execution start time self.start_time = None - self.multi_gpu_ddp_training_mode=False def continual_training_setup_executor(self) -> None: if self.is_continual_training: @@ -190,6 +191,7 @@ def end(self, form_of_labelling: str) -> dict: def write_report(self) -> None: """ Report training related information in a report.json file """ + # @TODO: Move to static funcs # Report total runtime. self.report['Runtime'] = time.time() - self.start_time print(f"Total Runtime: {self.report['Runtime']:.3f} seconds") @@ -217,19 +219,20 @@ def start(self) -> dict: print(f"Start time:{datetime.datetime.now()}") # (1) Loading the Data # Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute - if os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"): + if self.args.path_to_store_single_run and os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"): # Read the JSON file with open(self.args.path_to_store_single_run+'/memory_map_details.json', 'r') as file_descriptor: memory_map_details = json.load(file_descriptor) - self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy', dtype=memory_map_details["dtype"], mode='r', shape=tuple(memory_map_details["shape"])) - print(self.knowledge_graph[:10]) - + self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy', + mode='r', + dtype=memory_map_details["dtype"], + shape=tuple(memory_map_details["shape"])) + # self.args.num_entities = memory_map_details["num_entities"] self.args.num_relations = memory_map_details["num_relations"] self.args.num_tokens = None self.args.max_length_subword_tokens = None - self.args.ordered_bpe_entities = None - + self.args.ordered_bpe_entities = None else: self.knowledge_graph = read_or_load_kg(self.args, cls=KG) if self.is_continual_training is False: @@ -243,14 +246,12 @@ def start(self) -> dict: self.report['num_relations'] = self.knowledge_graph.num_relations self.report['max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None self.report['runtime_kg_loading'] = time.time() - self.start_time - - data={"shape":tuple(self.knowledge_graph.train_set.shape),"dtype":self.knowledge_graph.train_set.dtype.str, - "num_entities":self.knowledge_graph.num_entities,"num_relations":self.knowledge_graph.num_relations} + data={"shape":tuple(self.knowledge_graph.train_set.shape), + "dtype":self.knowledge_graph.train_set.dtype.str, + "num_entities":self.knowledge_graph.num_entities, + "num_relations":self.knowledge_graph.num_relations} with open(self.args.full_storage_path + '/memory_map_details.json', 'w') as file_descriptor: json.dump(data, file_descriptor, indent=4) - print("DEEEEMIRRR\n") - - print(self.knowledge_graph.train_set[:10]) # (2) Create an evaluator object. self.evaluator = Evaluator(args=self.args) # (3) Create a trainer object. diff --git a/dicee/knowledge_graph.py b/dicee/knowledge_graph.py index 176c44aa..199dc345 100644 --- a/dicee/knowledge_graph.py +++ b/dicee/knowledge_graph.py @@ -18,7 +18,7 @@ def __init__(self, dataset_dir: str = None, add_reciprocal: bool = None, eval_model: str = None, read_only_few: int = None, sample_triples_ratio: float = None, path_for_serialization: str = None, - entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None): + entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None, separator:str=None): """ :param dataset_dir: A path of a folder containing train.txt, valid.txt, test.text :param byte_pair_encoding: Apply Byte pair encoding. @@ -73,6 +73,7 @@ def __init__(self, dataset_dir: str = None, self.target_dim = None self.train_target_indices = None self.ordered_bpe_entities = None + self.separator=separator if self.path_for_deserialization is None: # Read a knowledge graph into memory @@ -84,6 +85,7 @@ def __init__(self, dataset_dir: str = None, else: LoadSaveToDisk(kg=self).load() assert len(self.train_set) > 0, "Training set is empty" + self.description_of_input=None self.describe() if self.entity_to_idx is not None: assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx, diff --git a/dicee/read_preprocess_save_load_kg/read_from_disk.py b/dicee/read_preprocess_save_load_kg/read_from_disk.py index 596517bf..5099d43c 100644 --- a/dicee/read_preprocess_save_load_kg/read_from_disk.py +++ b/dicee/read_preprocess_save_load_kg/read_from_disk.py @@ -24,11 +24,12 @@ def start(self) -> None: ------- None """ + if self.kg.path_single_kg is not None: self.kg.raw_train_set = read_from_disk(self.kg.path_single_kg, self.kg.read_only_few, self.kg.sample_triples_ratio, - backend=self.kg.backend) + backend=self.kg.backend,separator=self.kg.separator) if self.kg.add_noise_rate: self.add_noisy_triples_into_training() self.kg.raw_valid_set = None @@ -41,13 +42,13 @@ def start(self) -> None: for i in glob.glob(self.kg.dataset_dir + '/*'): if 'train' in i: self.kg.raw_train_set = read_from_disk(i, self.kg.read_only_few, self.kg.sample_triples_ratio, - backend=self.kg.backend) + backend=self.kg.backend, separator=self.kg.separator) if self.kg.add_noise_rate: self.add_noisy_triples_into_training() elif 'test' in i and self.kg.eval_model is not None: - self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend) + self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator) elif 'valid' in i and self.kg.eval_model is not None: - self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend) + self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator) else: print(f'Not processed data: {i}') else: diff --git a/dicee/read_preprocess_save_load_kg/util.py b/dicee/read_preprocess_save_load_kg/util.py index 3661915c..a92098ce 100644 --- a/dicee/read_preprocess_save_load_kg/util.py +++ b/dicee/read_preprocess_save_load_kg/util.py @@ -109,8 +109,9 @@ def timeit_wrapper(*args, **kwargs): @timeit -def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None) -> polars.DataFrame: +def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None, separator:str=None) -> polars.DataFrame: """ Load and Preprocess via Polars """ + assert separator is not None, "separator cannot be None" print(f'*** Reading {data_path} with Polars ***') # (1) Load the data. df = polars.read_csv(data_path, @@ -120,7 +121,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: columns=[0, 1, 2], dtypes=[polars.String], new_columns=['subject', 'relation', 'object'], - separator=" ") # \s+ doesn't work for polars + separator=separator) # \s+ doesn't work for polars # (2) Sample from (1). if sample_triples_ratio: print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...') @@ -135,13 +136,13 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: @timeit -def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None): +def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None,separator:str=None): + assert separator is not None, "separator cannot be None" print(f'*** Reading {data_path} with Pandas ***') if data_path[-3:] in [".nt","ttl", 'txt', 'csv', 'zst']: print('Reading with pandas.read_csv with sep ** s+ ** ...') - # TODO: if byte_pair_encoding=True, we should not use "\s+" as seperator I guess df = pd.read_csv(data_path, - sep="\s+", + sep=separator,#"\s+", header=None, nrows=None if read_only_few is None else read_only_few, usecols=[0, 1, 2], @@ -171,8 +172,9 @@ def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: def read_from_disk(data_path: str, read_only_few: int = None, - sample_triples_ratio: float = None, backend=None): - assert backend + sample_triples_ratio: float = None, backend:str=None,separator:str=None): + assert backend is not None, "backend cannot be None" + assert separator is not None, f"separator cannot be None. Currently {separator}" # If path exits if glob.glob(data_path): # (1) Detect data format @@ -180,20 +182,15 @@ def read_from_disk(data_path: str, read_only_few: int = None, if dformat in ["ttl", "owl", "turtle", "rdf/xml"] and backend != "rdflib": raise RuntimeError( f"Data with **{dformat}** format cannot be read via --backend pandas or polars. Use --backend rdflib") - if backend == 'pandas': - return read_with_pandas(data_path, read_only_few, sample_triples_ratio) + return read_with_pandas(data_path, read_only_few, sample_triples_ratio, separator) elif backend == 'polars': - return read_with_polars(data_path, read_only_few, sample_triples_ratio) + return read_with_polars(data_path, read_only_few, sample_triples_ratio, separator) elif backend == "rdflib": # Lazy import from rdflib import Graph - - try: - assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"] - except AssertionError: - raise AssertionError(f"--backend {backend} and dataformat **{dformat}** is not matching. " - f"Use --backend pandas") + assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"],\ + f"--backend {backend} and dataformat **{dformat}** is not matching. Use --backend pandas" return pd.DataFrame(data=[(str(s), str(p), str(o)) for s, p, o in Graph().parse(data_path)], columns=['subject', 'relation', 'object'], dtype=str) else: diff --git a/dicee/scripts/run.py b/dicee/scripts/run.py index fef0e5c7..0103535e 100755 --- a/dicee/scripts/run.py +++ b/dicee/scripts/run.py @@ -4,8 +4,6 @@ def get_default_arguments(description=None): """ Extends pytorch_lightning Trainer's arguments with ours """ - # From "pytorch-lightning==1.6.4" to "lightning>=2.1.3", 'Trainer' has no attribute 'add_argparse_args' - # parser = pl.Trainer.add_argparse_args(argparse.ArgumentParser(add_help=False)) parser = argparse.ArgumentParser(add_help=False) # Default Trainer param https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#methods # Knowledge graph related arguments @@ -14,19 +12,21 @@ def get_default_arguments(description=None): ",e.g., KGs/UMLS") parser.add_argument("--sparql_endpoint", type=str, default=None, help="An endpoint of a triple store, e.g. 'http://localhost:3030/mutagenesis/'. ") - parser.add_argument("--path_single_kg", type=str, default=None, + parser.add_argument("--path_single_kg", type=str, default=None,#"/home/cdemir/Desktop/Softwares/dice-embeddings/dice.nt", help="Path of a file corresponding to the input knowledge graph") # Saved files related arguments - parser.add_argument("--path_to_store_single_run", type=str, default=None, + parser.add_argument("--path_to_store_single_run", type=str, default=None,#"DBpedia", help="A single directory created that contains related data about embeddings.") parser.add_argument("--storage_path", type=str, default='Experiments', help="A directory named with time of execution under --storage_path " "that contains related data about embeddings.") parser.add_argument("--save_embeddings_as_csv", action="store_true", help="A flag for saving embeddings in csv file.") - parser.add_argument("--backend", type=str, default="polars", + parser.add_argument("--backend", type=str, default="pandas", choices=["pandas", "polars", "rdflib"], help='Backend for loading, preprocessing, indexing input knowledge graph.') + parser.add_argument("--separator", type=str, default="\s+", + help='Pandas \s+, t for \t polars works with the last two.') # Model related arguments parser.add_argument("--model", type=str, default="Keci", diff --git a/dicee/static_funcs.py b/dicee/static_funcs.py index 15d3636a..80654138 100644 --- a/dicee/static_funcs.py +++ b/dicee/static_funcs.py @@ -371,7 +371,8 @@ def read_or_load_kg(args, cls): path_for_serialization=args.full_storage_path, path_for_deserialization=args.path_experiment_folder if hasattr(args, 'path_experiment_folder') else None, backend=args.backend, - training_technique=args.scoring_technique) + training_technique=args.scoring_technique, + separator=args.separator) print(f'Preprocessing took: {time.time() - start_time:.3f} seconds') # (2) Share some info about data for easy access. print(kg.description_of_input) diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index b451ddf9..ae7fa691 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -171,7 +171,7 @@ def continual_start(self): model, form_of_labelling = self.initialize_or_load_model() assert form_of_labelling in ['EntityPrediction', 'RelationPrediction', 'Pyke'] assert self.args.scoring_technique in ["AllvsAll","KvsSample" ,"1vsSample", "1vsAll", "KvsAll", "NegSample"] - train_loader = self.initialize_dataloader( + train_loader = self.init_dataloader( reload_dataset(path=self.storage_path, form_of_labelling=form_of_labelling, scoring_technique=self.args.scoring_technique, neg_ratio=self.args.neg_ratio, @@ -225,10 +225,6 @@ def init_dataset(self) -> torch.utils.data.Dataset: label_smoothing_rate=self.args.label_smoothing_rate, byte_pair_encoding=self.args.byte_pair_encoding, block_size=self.args.block_size) - # TODO: No need to keep the data in memory - if self.args.eval_model is None: - del dataset.train_set - gc.collect() else: train_dataset = construct_dataset(train_set=self.trainer.dataset, valid_set=None, diff --git a/dicee/trainer/torch_trainer_ddp.py b/dicee/trainer/torch_trainer_ddp.py index 4875a39b..26825af4 100644 --- a/dicee/trainer/torch_trainer_ddp.py +++ b/dicee/trainer/torch_trainer_ddp.py @@ -61,7 +61,6 @@ def fit(self, *args, **kwargs): collate_fn=kwargs['train_dataloaders'].dataset.collate_fn, sampler=torch.utils.data.distributed.DistributedSampler( train_dataset_loader.dataset)) - # (3) Start NodeTrainer. NodeTrainer(self, model, train_dataset_loader, self.callbacks, self.attributes.num_epochs).train() torch.distributed.destroy_process_group() @@ -89,6 +88,9 @@ def __init__(self, self.model = torch.nn.parallel.DistributedDataParallel(self.model, device_ids=[self.local_rank], output_device=self.local_rank) self.num_epochs = num_epochs self.loss_history = [] + # TODO: CD: This should be given as an input param + ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}["bfloat16"] + self.ctx = torch.amp.autocast(device_type="cuda",dtype=ptdtype) def _load_snapshot(self, snapshot_path): raise NotImplementedError @@ -108,9 +110,10 @@ def _run_batch(self, source: torch.LongTensor, targets: torch.FloatTensor): """ self.optimizer.zero_grad() - output = self.model(source) - loss = self.loss_func(output, targets) - batch_loss = loss.item() + with self.ctx: + output = self.model(source) + loss = self.loss_func(output, targets) + batch_loss = loss.item() loss.backward() self.optimizer.step() return batch_loss From f992c349b30b8f488107aff0a8ee4b5a0e18ad88 Mon Sep 17 00:00:00 2001 From: Caglar Demir Date: Wed, 23 Oct 2024 21:56:07 +0200 Subject: [PATCH 4/5] fixes for separator --- dicee/config.py | 3 +++ tests/test_continual_training.py | 4 ++-- tests/test_different_backends.py | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dicee/config.py b/dicee/config.py index 3d7921cc..22cd7844 100644 --- a/dicee/config.py +++ b/dicee/config.py @@ -50,6 +50,9 @@ def __init__(self, **kwargs): self.backend: str = "pandas" """Backend to read, process, and index input knowledge graph. pandas, polars and rdflib available""" + self.separator: str = "\s+" + """separator for extracting head, relation and tail from a triple""" + self.trainer: str = 'torchCPUTrainer' """Trainer for knowledge graph embedding model""" diff --git a/tests/test_continual_training.py b/tests/test_continual_training.py index c8c63fd5..5aca93e0 100644 --- a/tests/test_continual_training.py +++ b/tests/test_continual_training.py @@ -29,7 +29,7 @@ def test_negative_sampling(self): result = Execute(args).start() assert os.path.isdir(result['path_experiment_folder']) pre_trained_kge = KGE(path=result['path_experiment_folder']) - kg = KG(dataset_dir=args.dataset_dir) + kg = KG(dataset_dir=args.dataset_dir,separator="\s+") pre_trained_kge.train(kg, epoch=1, batch_size=args.batch_size) @pytest.mark.filterwarnings('ignore::UserWarning') @@ -54,5 +54,5 @@ def test_negative_sampling_Family(self): assert os.path.isdir(result['path_experiment_folder']) pre_trained_kge = KGE(path=result['path_experiment_folder']) kg = KG(args.dataset_dir, entity_to_idx=pre_trained_kge.entity_to_idx, - relation_to_idx=pre_trained_kge.relation_to_idx) + relation_to_idx=pre_trained_kge.relation_to_idx,separator="\s+") pre_trained_kge.train(kg, epoch=1, batch_size=args.batch_size) diff --git a/tests/test_different_backends.py b/tests/test_different_backends.py index 8b40bcc3..a68210ee 100644 --- a/tests/test_different_backends.py +++ b/tests/test_different_backends.py @@ -28,4 +28,5 @@ def test_pandas_as_backend(self): args = Namespace() args.dataset_dir = 'KGs/UMLS' args.backend = 'polars' + args.separator=" " Execute(args).start() From 8e5b8407eae1a203422e38d590423656dbbd4554 Mon Sep 17 00:00:00 2001 From: Caglar Demir Date: Wed, 23 Oct 2024 21:59:50 +0200 Subject: [PATCH 5/5] formating fixes --- dicee/executer.py | 2 +- dicee/knowledge_graph.py | 2 -- dicee/trainer/dice_trainer.py | 3 +-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/dicee/executer.py b/dicee/executer.py index 8c4f0aa1..bddbeda5 100644 --- a/dicee/executer.py +++ b/dicee/executer.py @@ -11,7 +11,7 @@ # Avoid from .static_preprocess_funcs import preprocesses_input_args from .trainer import DICE_Trainer -from .static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store, create_experiment_folder +from .static_funcs import timeit, read_or_load_kg, load_json, store, create_experiment_folder import numpy as np logging.getLogger('pytorch_lightning').setLevel(0) diff --git a/dicee/knowledge_graph.py b/dicee/knowledge_graph.py index 199dc345..b105cc00 100644 --- a/dicee/knowledge_graph.py +++ b/dicee/knowledge_graph.py @@ -3,8 +3,6 @@ import sys import pandas as pd import polars as pl -import numpy as np -from .read_preprocess_save_load_kg.util import load_numpy_ndarray class KG: """ Knowledge Graph """ diff --git a/dicee/trainer/dice_trainer.py b/dicee/trainer/dice_trainer.py index ae7fa691..78c78f43 100644 --- a/dicee/trainer/dice_trainer.py +++ b/dicee/trainer/dice_trainer.py @@ -1,6 +1,5 @@ import lightning as pl import polars -import gc from typing import Union from dicee.models.base_model import BaseKGE from dicee.static_funcs import select_model @@ -300,7 +299,7 @@ def k_fold_cross_validation(self, dataset) -> Tuple[BaseKGE, str]: train_set_for_i_th_fold, test_set_for_i_th_fold = dataset.train_set[train_index], dataset.train_set[ test_index] - trainer.fit(model, train_dataloaders=self.initialize_dataloader( + trainer.fit(model, train_dataloaders=self.init_dataloader( construct_dataset(train_set=train_set_for_i_th_fold, entity_to_idx=dataset.entity_to_idx, relation_to_idx=dataset.relation_to_idx,