Skip to content

Commit

Permalink
Merge pull request #286 from dice-group/refactor
Browse files Browse the repository at this point in the history
Refactoring
  • Loading branch information
Demirrr authored Dec 3, 2024
2 parents 3332aec + 01e696b commit a766711
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 120 deletions.
17 changes: 15 additions & 2 deletions dicee/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import torch
import numpy as np
import json

from . import load_json
from .static_funcs import pickle
from .static_funcs_training import evaluate_lp, evaluate_bpe_lp
from typing import Tuple, List
Expand Down Expand Up @@ -67,6 +69,7 @@ def vocab_preparation(self, dataset) -> None:

# @timeit
def eval(self, dataset: KG, trained_model, form_of_labelling, during_training=False) -> None:
assert isinstance(dataset, KG), "dataset must be KG"
# @TODO: Why this reassigment ?
self.during_training = during_training
# (1) Exit, if the flag is not set
Expand Down Expand Up @@ -140,6 +143,9 @@ def __load_and_set_mappings(self):
self.er_vocab = pickle.load(open(self.args.full_storage_path + "/er_vocab.p", "rb"))
self.re_vocab = pickle.load(open(self.args.full_storage_path + "/re_vocab.p", "rb"))
self.ee_vocab = pickle.load(open(self.args.full_storage_path + "/ee_vocab.p", "rb"))
report = load_json(self.args.full_storage_path + "/report.json")
self.num_entities = report["num_entities"]
self.num_relations = report["num_relations"]

def eval_rank_of_head_and_tail_entity(self, *, train_set, valid_set=None, test_set=None, trained_model):
# 4. Test model on the training dataset if it is needed.
Expand Down Expand Up @@ -426,11 +432,17 @@ def evaluate_lp_bpe_k_vs_all(self, model, triples: List[List[str]], info=None, f
return results

def evaluate_lp(self, model, triple_idx, info: str):
return evaluate_lp(model, triple_idx, num_entities=self.num_entities, er_vocab=self.er_vocab,
assert self.num_entities is not None, "self.num_entities cannot be None"
assert self.er_vocab is not None, "self.er_vocab cannot be None"
assert self.re_vocab is not None, "self.re_vocab cannot be None"
return evaluate_lp(model, triple_idx,
num_entities=self.num_entities,
er_vocab=self.er_vocab,
re_vocab=self.re_vocab, info=info)

def dummy_eval(self, trained_model, form_of_labelling: str):

assert trained_model is not None
# @TODO:CD: Why such naming! We need to document it better.
if self.is_continual_training:
self.__load_and_set_mappings()

Expand All @@ -448,6 +460,7 @@ def dummy_eval(self, trained_model, form_of_labelling: str):
trained_model=trained_model, form_of_labelling=form_of_labelling)
else:
raise ValueError(f'Invalid argument: {self.args.scoring_technique}')

with open(self.args.full_storage_path + '/eval_report.json', 'w') as file_descriptor:
json.dump(self.report, file_descriptor, indent=4)

Expand Down
56 changes: 14 additions & 42 deletions dicee/executer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ def __init__(self, args, continuous_training=False):
def setup_executor(self) -> None:
if self.is_continual_training is False:
# Create a single directory containing KGE and all related data
if self.args.path_to_store_single_run:
os.makedirs(self.args.path_to_store_single_run, exist_ok=True)
if self.args.path_to_store_single_run is not None:
if os.path.exists(self.args.path_to_store_single_run):
print(f"Deleting the existing directory of {self.args.path_to_store_single_run}")
os.system(f'rm -rf {self.args.path_to_store_single_run}')
os.makedirs(self.args.path_to_store_single_run, exist_ok=False)
self.args.full_storage_path = self.args.path_to_store_single_run
else:
self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path)
Expand All @@ -59,35 +62,6 @@ def setup_executor(self) -> None:
temp = vars(self.args)
json.dump(temp, file_descriptor, indent=3)

def dept_read_preprocess_index_serialize_data(self) -> None:
""" Read & Preprocess & Index & Serialize Input Data
(1) Read or load the data from disk into memory.
(2) Store the statistics of the data.
Parameter
----------
Return
----------
None
"""
# (2) Store the stats and share parameters
self.args.num_entities = self.knowledge_graph.num_entities
self.args.num_relations = self.knowledge_graph.num_relations
self.args.num_tokens = self.knowledge_graph.num_tokens
self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens
self.args.ordered_bpe_entities=self.knowledge_graph.ordered_bpe_entities
self.report['num_train_triples'] = len(self.knowledge_graph.train_set)
self.report['num_entities'] = self.knowledge_graph.num_entities
self.report['num_relations'] = self.knowledge_graph.num_relations
self.report['num_relations'] = self.knowledge_graph.num_relations
self.report[
'max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None

self.report['runtime_kg_loading'] = time.time() - self.start_time

@timeit
def save_trained_model(self) -> None:
""" Save a knowledge graph embedding model
Expand All @@ -113,19 +87,17 @@ def save_trained_model(self) -> None:
self.report.update(self.trained_model.mem_of_model())
# (3) Store/Serialize Model for further use.
if self.is_continual_training is False:
store(trainer=self.trainer,
trained_model=self.trained_model,
store(trained_model=self.trained_model,
model_name='model',
full_storage_path=self.args.full_storage_path,#self.storage_path,
full_storage_path=self.args.full_storage_path,
save_embeddings_as_csv=self.args.save_embeddings_as_csv)
else:
store(trainer=self.trainer,
trained_model=self.trained_model,
model_name='model_' + str(datetime.datetime.now()),
full_storage_path=self.args.full_storage_path,#self.storage_path,
store(trained_model=self.trained_model,
model_name='model', # + str(datetime.datetime.now()),
full_storage_path=self.args.full_storage_path,
save_embeddings_as_csv=self.args.save_embeddings_as_csv)

self.report['path_experiment_folder'] = self.args.full_storage_path#self.storage_path
self.report['path_experiment_folder'] = self.args.full_storage_path
self.report['num_entities'] = self.args.num_entities
self.report['num_relations'] = self.args.num_relations

Expand Down Expand Up @@ -154,7 +126,8 @@ def end(self, form_of_labelling: str) -> dict:
self.write_report()
return {**self.report}
else:
self.evaluator.eval(dataset=self.knowledge_graph, trained_model=self.trained_model,
self.evaluator.eval(dataset=self.knowledge_graph,
trained_model=self.trained_model,
form_of_labelling=form_of_labelling)
self.write_report()
return {**self.report, **self.evaluator.report}
Expand Down Expand Up @@ -246,7 +219,7 @@ class ContinuousExecute(Execute):

def __init__(self, args):
# (1) Current input configuration.
assert os.path.exists(args.continual_learning)
assert os.path.exists(args.continual_learning), f"Path doesn't exist {args.continual_learning}"
assert os.path.isfile(args.continual_learning + '/configuration.json')
# (2) Load previous input configuration.
previous_args = load_json(args.continual_learning + '/configuration.json')
Expand Down Expand Up @@ -303,7 +276,6 @@ def continual_start(self) -> dict:
self.args.ordered_bpe_entities = None

self.trained_model, form_of_labelling = self.trainer.continual_start(knowledge_graph)

# (5) Store trained model.
self.save_trained_model()
# (6) Eval model.
Expand Down
38 changes: 24 additions & 14 deletions dicee/models/ensemble.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import torch
import copy

from typing import List
class EnsembleKGE:
def __init__(self, seed_model):
self.models = []
self.optimizers = []
self.loss_history = []
for i in range(torch.cuda.device_count()):
i_model=copy.deepcopy(seed_model)
# TODO: Why we cant send the compile model to cpu ?
#i_model = torch.compile(i_model)
i_model.to(torch.device(f"cuda:{i}"))
self.optimizers.append(i_model.configure_optimizers())
self.models.append(i_model)
# Maybe use the original model's name ?
def __init__(self, seed_model=None,pretrained_models:List=None):

if seed_model is not None:
self.models = []
self.optimizers = []
self.loss_history = []
for i in range(torch.cuda.device_count()):
i_model=copy.deepcopy(seed_model)
# TODO: Why we cant send the compile model to cpu ?
#i_model = torch.compile(i_model)
i_model.to(torch.device(f"cuda:{i}"))
self.optimizers.append(i_model.configure_optimizers())
self.models.append(i_model)
else:
assert pretrained_models is not None
self.models = pretrained_models
self.optimizers = []
self.loss_history = []

for i in range(torch.cuda.device_count()):
self.models[i].to(torch.device(f"cuda:{i}"))
self.optimizers.append(self.models[i].configure_optimizers())
# Maybe use the original model's name ?
self.name=self.models[0].name
self.train_mode=True

def named_children(self):
return self.models[0].named_children()
@property
Expand Down
64 changes: 35 additions & 29 deletions dicee/static_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,40 @@ def select_model(args: dict, is_continual_training: bool = None, storage_path: s
assert isinstance(is_continual_training, bool)
assert isinstance(storage_path, str)
if is_continual_training:
print('Loading pre-trained model...')
model, _ = intialize_model(args)
try:
weights = torch.load(storage_path + '/model.pt', torch.device('cpu'))
model.load_state_dict(weights)
for parameter in model.parameters():
parameter.requires_grad = True
model.train()
except FileNotFoundError:
print(f"{storage_path}/model.pt is not found. The model will be trained with random weights")
return model, _
# Check whether we have tensor parallelized KGE.
files_under_storage_path = [f for f in os.listdir(storage_path) if os.path.isfile(os.path.join(storage_path, f))]
num_of_partial_models_for_tensor_parallel= len([ i for i in files_under_storage_path if "partial" in i ])
if num_of_partial_models_for_tensor_parallel >= 1:
models=[]
labelling_flag=None
for i in range(num_of_partial_models_for_tensor_parallel):
model, labelling_flag = intialize_model(args)
weights = torch.load(storage_path + f'/model_partial_{i}.pt', torch.device('cpu'),weights_only=False)
model.load_state_dict(weights)
for parameter in model.parameters():
parameter.requires_grad = True
model.train()
models.append(model)
return EnsembleKGE(pretrained_models=models), labelling_flag
else:
print('Loading pre-trained model...')
model, labelling_flag = intialize_model(args)
try:
weights = torch.load(storage_path + '/model.pt', torch.device('cpu'))
model.load_state_dict(weights)
for parameter in model.parameters():
parameter.requires_grad = True
model.train()
except FileNotFoundError as e:
print(f"{storage_path}/model.pt is not found. The model will be trained with random weights")
raise e
return model, labelling_flag
else:
return intialize_model(args)
model, labelling_flag= intialize_model(args)
if args["trainer"]=="TP":
model=EnsembleKGE(seed_model=model)

return model, labelling_flag

def load_model(path_of_experiment_folder: str, model_name='model.pt',verbose=0) -> Tuple[object, Tuple[dict, dict]]:
""" Load weights and initialize pytorch module from namespace arguments"""
Expand Down Expand Up @@ -279,32 +299,18 @@ def numpy_data_type_changer(train_set: np.ndarray, num: int) -> np.ndarray:
def save_checkpoint_model(model, path: str) -> None:
""" Store Pytorch model into disk"""
if isinstance(model, BaseKGE):
try:
torch.save(model.state_dict(), path)
except ReferenceError as e:
print(e)
print(model.name)
print('Could not save the model correctly')
torch.save(model.state_dict(), path)
elif isinstance(model, EnsembleKGE):
# path comes with ../model_...
for i, partial_model in enumerate(model):
new_path=path.replace("model.pt",f"model_partial_{i}.pt")
torch.save(partial_model.state_dict(), new_path)
else:
torch.save(model.model.state_dict(), path)


def store(trainer,
trained_model, model_name: str = 'model', full_storage_path: str = None,
def store(trained_model, model_name: str = 'model', full_storage_path: str = None,
save_embeddings_as_csv=False) -> None:
"""
Store trained_model model and save embeddings into csv file.
:param trainer: an instance of trainer class
:param full_storage_path: path to save parameters.
:param model_name: string representation of the name of the model.
:param trained_model: an instance of BaseKGE see core.models.base_model .
:param save_embeddings_as_csv: for easy access of embeddings.
:return:
"""
assert full_storage_path is not None
assert isinstance(model_name, str)
assert len(model_name) > 1
Expand Down
26 changes: 8 additions & 18 deletions dicee/static_funcs_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,18 @@ def make_iterable_verbose(iterable_object, verbose, desc="Default", position=Non
return iterable_object

@torch.no_grad()
def evaluate_lp(model, triple_idx, num_entities, er_vocab: Dict[Tuple, List], re_vocab: Dict[Tuple, List],
def evaluate_lp(model=None, triple_idx=None, num_entities=None, er_vocab: Dict[Tuple, List]=None,
re_vocab: Dict[Tuple, List]=None,
info='Eval Starts', batch_size=128, chunk_size=1000):
"""
Evaluate model in a standard link prediction task
for each triple
the rank is computed by taking the mean of the filtered missing head entity rank and
the filtered missing tail entity rank
:param model:
:param triple_idx:
:param num_entities:
:param er_vocab:
:param re_vocab:
:param info:
:param batch_size:
:param chunk_size:
:return:
"""
assert model is not None, "Model must be provided"
assert triple_idx is not None, "triple_idx must be provided"
assert num_entities is not None, "num_entities must be provided"
assert er_vocab is not None, "er_vocab must be provided"
assert re_vocab is not None, "re_vocab must be provided"

model.eval()
print(info)
print(f'Num of triples {len(triple_idx)}')
print('** Evaluation with batching')
hits = dict()
reciprocal_ranks = []
# Iterate over test triples
Expand Down
7 changes: 5 additions & 2 deletions dicee/trainer/dice_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class DICE_Trainer:
report:dict
"""

def __init__(self, args, is_continual_training, storage_path, evaluator=None):
def __init__(self, args, is_continual_training:bool, storage_path, evaluator=None):
self.report = dict()
self.args = args
self.trainer = None
Expand Down Expand Up @@ -170,7 +170,6 @@ def continual_start(self,knowledge_graph):
self.trainer = self.initialize_trainer(callbacks=get_callbacks(self.args))
model, form_of_labelling = self.initialize_or_load_model()
# TODO: Here we need to load memory pag

self.trainer.evaluator = self.evaluator
self.trainer.dataset = knowledge_graph
self.trainer.form_of_labelling = form_of_labelling
Expand Down Expand Up @@ -230,6 +229,8 @@ def init_dataset(self) -> torch.utils.data.Dataset:
byte_pair_encoding=self.args.byte_pair_encoding,
block_size=self.args.block_size)
else:
assert isinstance(self.trainer.dataset, np.memmap), ("Train dataset must be an instance of memmap. "
f"Currently, {type(np.memmap)}!")
if self.args.continual_learning:
path = self.args.continual_learning
else:
Expand Down Expand Up @@ -276,6 +277,8 @@ def start(self, knowledge_graph: Union[KG,np.memmap]) -> Tuple[BaseKGE, str]:
# TODO: Later, maybe we should write a callback to save the models in disk

if isinstance(self.trainer, TensorParallel):
assert isinstance(model, EnsembleKGE), type(model)

model = self.trainer.fit(model, train_dataloaders=self.init_dataloader(self.init_dataset()))
assert isinstance(model,EnsembleKGE)
else:
Expand Down
9 changes: 3 additions & 6 deletions dicee/trainer/model_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,14 @@ def forward_backward_update_loss(z:Tuple, ensemble_model)->float:
class TensorParallel(AbstractTrainer):
def __init__(self, args, callbacks):
super().__init__(args, callbacks)
self.models=[]

def get_ensemble(self):
return self.models

def fit(self, *args, **kwargs):
""" Train model """
assert len(args) == 1
seed_model, = args
# () Init. ensemble model.
ensemble_model = EnsembleKGE(seed_model)
ensemble_model, = args
assert isinstance(ensemble_model,EnsembleKGE), (f"Selected model must "
f"be an instance of EnsembleKGE{type(ensemble_model)}")
# () Run on_fit_start callbacks.
self.on_fit_start(self, ensemble_model)
# () Sanity checking
Expand Down
Loading

0 comments on commit a766711

Please sign in to comment.