Skip to content

Commit

Permalink
Merge 8e5b840 into 2eb5db4
Browse files Browse the repository at this point in the history
  • Loading branch information
Demirrr authored Oct 23, 2024
2 parents 2eb5db4 + 8e5b840 commit 72e9508
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 138 deletions.
3 changes: 3 additions & 0 deletions dicee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def __init__(self, **kwargs):
self.backend: str = "pandas"
"""Backend to read, process, and index input knowledge graph. pandas, polars and rdflib available"""

self.separator: str = "\s+"
"""separator for extracting head, relation and tail from a triple"""

self.trainer: str = 'torchCPUTrainer'
"""Trainer for knowledge graph embedding model"""

Expand Down
7 changes: 2 additions & 5 deletions dicee/dataset_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ class OnevsAllDataset(torch.utils.data.Dataset):

def __init__(self, train_set_idx: np.ndarray, entity_idxs):
super().__init__()
assert isinstance(train_set_idx, np.memmap)

assert isinstance(train_set_idx, np.ndarray)
assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray)
assert len(train_set_idx) > 0
self.train_data = train_set_idx
self.target_dim = len(entity_idxs)
Expand Down Expand Up @@ -300,8 +298,7 @@ def __init__(self, train_set_idx: np.ndarray, entity_idxs, relation_idxs, form,
label_smoothing_rate: float = 0.0):
super().__init__()
assert len(train_set_idx) > 0
assert isinstance(train_set_idx, np.memmap)
assert isinstance(train_set_idx, np.ndarray)
assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray)
self.train_data = None
self.train_target = None
self.label_smoothing_rate = torch.tensor(label_smoothing_rate)
Expand Down
92 changes: 64 additions & 28 deletions dicee/executer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import os
import datetime
from pytorch_lightning import seed_everything

from dicee.knowledge_graph import KG
from dicee.evaluator import Evaluator
from .knowledge_graph import KG
from .evaluator import Evaluator
# Avoid
from dicee.static_preprocess_funcs import preprocesses_input_args
from dicee.trainer import DICE_Trainer
from dicee.static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store
from .static_preprocess_funcs import preprocesses_input_args
from .trainer import DICE_Trainer
from .static_funcs import timeit, read_or_load_kg, load_json, store, create_experiment_folder
import numpy as np

logging.getLogger('pytorch_lightning').setLevel(0)
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
Expand All @@ -33,21 +33,40 @@ def __init__(self, args, continuous_training=False):
seed_everything(args.random_seed, workers=True)
# (3) Set the continual training flag
self.is_continual_training = continuous_training
# TODO: self.storage_path vs self.args.full_storage_path. One of them is redudandant.
self.storage_path=None
# (4) Create an experiment folder or use the previous one
continual_training_setup_executor(self)
self.continual_training_setup_executor()
# (5) A variable is initialized for pytorch lightning trainer or DICE_Trainer()
self.trainer = None
self.trained_model = None
# (6) A variable is initialized for storing input data.
self.knowledge_graph = None
# (7) Store few data in memory for numerical results, e.g. runtime, H@1 etc.
self.report = dict()
# (8) Create an object to carry out link prediction evaluations
self.evaluator = None # e.g. Evaluator(self)
# (8) Create an object to carry out link prediction evaluations, e.g. Evaluator(self)
self.evaluator = None
# (9) Execution start time
self.start_time = None

def read_or_load_kg(self):
def continual_training_setup_executor(self) -> None:
if self.is_continual_training:
# () We continue the training, then we store new models on previous path.
self.storage_path = self.args.full_storage_path
else:
# Create a single directory containing KGE and all related data
if self.args.path_to_store_single_run:
os.makedirs(self.args.path_to_store_single_run, exist_ok=True)
self.args.full_storage_path = self.args.path_to_store_single_run
else:
# Create a parent and subdirectory.
self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path)
self.storage_path = self.args.full_storage_path
with open(self.args.full_storage_path + '/configuration.json', 'w') as file_descriptor:
temp = vars(self.args)
json.dump(temp, file_descriptor, indent=3)

def dept_read_or_load_kg(self):
print('*** Read or Load Knowledge Graph ***')
start_time = time.time()
kg = KG(dataset_dir=self.args.dataset_dir,
Expand Down Expand Up @@ -84,9 +103,6 @@ def read_preprocess_index_serialize_data(self) -> None:
None
"""
# (1) Read & Preprocess & Index & Serialize Input Data.
self.knowledge_graph = self.read_or_load_kg()

# (2) Store the stats and share parameters
self.args.num_entities = self.knowledge_graph.num_entities
self.args.num_relations = self.knowledge_graph.num_relations
Expand All @@ -102,19 +118,6 @@ def read_preprocess_index_serialize_data(self) -> None:

self.report['runtime_kg_loading'] = time.time() - self.start_time

def load_indexed_data(self) -> None:
""" Load the indexed data from disk into memory
Parameter
----------
Return
----------
None
"""
self.knowledge_graph = read_or_load_kg(self.args, cls=KG)

@timeit
def save_trained_model(self) -> None:
""" Save a knowledge graph embedding model
Expand Down Expand Up @@ -188,6 +191,7 @@ def end(self, form_of_labelling: str) -> dict:

def write_report(self) -> None:
""" Report training related information in a report.json file """
# @TODO: Move to static funcs
# Report total runtime.
self.report['Runtime'] = time.time() - self.start_time
print(f"Total Runtime: {self.report['Runtime']:.3f} seconds")
Expand Down Expand Up @@ -215,7 +219,39 @@ def start(self) -> dict:
print(f"Start time:{datetime.datetime.now()}")
# (1) Loading the Data
# Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute
self.load_indexed_data() if self.is_continual_training else self.read_preprocess_index_serialize_data()
if self.args.path_to_store_single_run and os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"):
# Read the JSON file
with open(self.args.path_to_store_single_run+'/memory_map_details.json', 'r') as file_descriptor:
memory_map_details = json.load(file_descriptor)
self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy',
mode='r',
dtype=memory_map_details["dtype"],
shape=tuple(memory_map_details["shape"]))
#
self.args.num_entities = memory_map_details["num_entities"]
self.args.num_relations = memory_map_details["num_relations"]
self.args.num_tokens = None
self.args.max_length_subword_tokens = None
self.args.ordered_bpe_entities = None
else:
self.knowledge_graph = read_or_load_kg(self.args, cls=KG)
if self.is_continual_training is False:
self.args.num_entities = self.knowledge_graph.num_entities
self.args.num_relations = self.knowledge_graph.num_relations
self.args.num_tokens = self.knowledge_graph.num_tokens
self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens
self.args.ordered_bpe_entities = self.knowledge_graph.ordered_bpe_entities
self.report['num_train_triples'] = len(self.knowledge_graph.train_set)
self.report['num_entities'] = self.knowledge_graph.num_entities
self.report['num_relations'] = self.knowledge_graph.num_relations
self.report['max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None
self.report['runtime_kg_loading'] = time.time() - self.start_time
data={"shape":tuple(self.knowledge_graph.train_set.shape),
"dtype":self.knowledge_graph.train_set.dtype.str,
"num_entities":self.knowledge_graph.num_entities,
"num_relations":self.knowledge_graph.num_relations}
with open(self.args.full_storage_path + '/memory_map_details.json', 'w') as file_descriptor:
json.dump(data, file_descriptor, indent=4)
# (2) Create an evaluator object.
self.evaluator = Evaluator(args=self.args)
# (3) Create a trainer object.
Expand Down Expand Up @@ -292,4 +328,4 @@ def continual_start(self) -> dict:
else:
self.evaluator = Evaluator(args=self.args, is_continual_training=True)
self.evaluator.dummy_eval(self.trained_model, form_of_labelling)
return {**self.report, **self.evaluator.report}
return {**self.report, **self.evaluator.report}
18 changes: 9 additions & 9 deletions dicee/knowledge_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import sys
import pandas as pd
import polars as pl
import numpy as np
from .read_preprocess_save_load_kg.util import load_numpy_ndarray
class KG:
""" Knowledge Graph """

Expand All @@ -18,7 +16,7 @@ def __init__(self, dataset_dir: str = None,
add_reciprocal: bool = None, eval_model: str = None,
read_only_few: int = None, sample_triples_ratio: float = None,
path_for_serialization: str = None,
entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None):
entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None, separator:str=None):
"""
:param dataset_dir: A path of a folder containing train.txt, valid.txt, test.text
:param byte_pair_encoding: Apply Byte pair encoding.
Expand All @@ -36,13 +34,14 @@ def __init__(self, dataset_dir: str = None,
:param training_technique
"""
self.dataset_dir = dataset_dir
self.sparql_endpoint = sparql_endpoint
self.path_single_kg = path_single_kg

self.byte_pair_encoding = byte_pair_encoding
self.ordered_shaped_bpe_tokens = None
self.sparql_endpoint = sparql_endpoint
self.add_noise_rate = add_noise_rate
self.num_entities = None
self.num_relations = None
self.path_single_kg = path_single_kg
self.path_for_deserialization = path_for_deserialization
self.add_reciprocal = add_reciprocal
self.eval_model = eval_model
Expand Down Expand Up @@ -72,6 +71,7 @@ def __init__(self, dataset_dir: str = None,
self.target_dim = None
self.train_target_indices = None
self.ordered_bpe_entities = None
self.separator=separator

if self.path_for_deserialization is None:
# Read a knowledge graph into memory
Expand All @@ -83,8 +83,8 @@ def __init__(self, dataset_dir: str = None,
else:
LoadSaveToDisk(kg=self).load()
assert len(self.train_set) > 0, "Training set is empty"
self._describe()

self.description_of_input=None
self.describe()
if self.entity_to_idx is not None:
assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx,
pl.DataFrame), f"entity_to_idx must be a dict or a polars DataFrame: {type(self.entity_to_idx)}"
Expand All @@ -96,8 +96,8 @@ def __init__(self, dataset_dir: str = None,
print(f"No inverse mapping created as self.entity_to_idx is not a type of dictionary but {type(self.entity_to_idx)}\n"
f"Backend might be selected as polars")

def _describe(self) -> None:
self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir} -------------------'
def describe(self) -> None:
self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir if isinstance(self.dataset_dir, str) else self.sparql_endpoint if isinstance(self.sparql_endpoint, str) else self.path_single_kg} -------------------'
if self.byte_pair_encoding:
self.description_of_input += f'\nNumber of tokens:{self.num_tokens}' \
f'\nNumber of max sequence of sub-words: {self.max_length_subword_tokens}' \
Expand Down
9 changes: 5 additions & 4 deletions dicee/read_preprocess_save_load_kg/read_from_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ def start(self) -> None:
-------
None
"""

if self.kg.path_single_kg is not None:
self.kg.raw_train_set = read_from_disk(self.kg.path_single_kg,
self.kg.read_only_few,
self.kg.sample_triples_ratio,
backend=self.kg.backend)
backend=self.kg.backend,separator=self.kg.separator)
if self.kg.add_noise_rate:
self.add_noisy_triples_into_training()
self.kg.raw_valid_set = None
Expand All @@ -41,13 +42,13 @@ def start(self) -> None:
for i in glob.glob(self.kg.dataset_dir + '/*'):
if 'train' in i:
self.kg.raw_train_set = read_from_disk(i, self.kg.read_only_few, self.kg.sample_triples_ratio,
backend=self.kg.backend)
backend=self.kg.backend, separator=self.kg.separator)
if self.kg.add_noise_rate:
self.add_noisy_triples_into_training()
elif 'test' in i and self.kg.eval_model is not None:
self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend)
self.kg.raw_test_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator)
elif 'valid' in i and self.kg.eval_model is not None:
self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend)
self.kg.raw_valid_set = read_from_disk(i, backend=self.kg.backend, separator=self.kg.separator)
else:
print(f'Not processed data: {i}')
else:
Expand Down
29 changes: 13 additions & 16 deletions dicee/read_preprocess_save_load_kg/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ def timeit_wrapper(*args, **kwargs):


@timeit
def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None) -> polars.DataFrame:
def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio: float = None, separator:str=None) -> polars.DataFrame:
""" Load and Preprocess via Polars """
assert separator is not None, "separator cannot be None"
print(f'*** Reading {data_path} with Polars ***')
# (1) Load the data.
df = polars.read_csv(data_path,
Expand All @@ -120,7 +121,7 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio:
columns=[0, 1, 2],
dtypes=[polars.String],
new_columns=['subject', 'relation', 'object'],
separator=" ") # \s+ doesn't work for polars
separator=separator) # \s+ doesn't work for polars
# (2) Sample from (1).
if sample_triples_ratio:
print(f'Subsampling {sample_triples_ratio} of input data {df.shape}...')
Expand All @@ -135,13 +136,13 @@ def read_with_polars(data_path, read_only_few: int = None, sample_triples_ratio:


@timeit
def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None):
def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio: float = None,separator:str=None):
assert separator is not None, "separator cannot be None"
print(f'*** Reading {data_path} with Pandas ***')
if data_path[-3:] in [".nt","ttl", 'txt', 'csv', 'zst']:
print('Reading with pandas.read_csv with sep ** s+ ** ...')
# TODO: if byte_pair_encoding=True, we should not use "\s+" as seperator I guess
df = pd.read_csv(data_path,
sep="\s+",
sep=separator,#"\s+",
header=None,
nrows=None if read_only_few is None else read_only_few,
usecols=[0, 1, 2],
Expand Down Expand Up @@ -171,29 +172,25 @@ def read_with_pandas(data_path, read_only_few: int = None, sample_triples_ratio:


def read_from_disk(data_path: str, read_only_few: int = None,
sample_triples_ratio: float = None, backend=None):
assert backend
sample_triples_ratio: float = None, backend:str=None,separator:str=None):
assert backend is not None, "backend cannot be None"
assert separator is not None, f"separator cannot be None. Currently {separator}"
# If path exits
if glob.glob(data_path):
# (1) Detect data format
dformat = data_path[data_path.find(".") + 1:]
if dformat in ["ttl", "owl", "turtle", "rdf/xml"] and backend != "rdflib":
raise RuntimeError(
f"Data with **{dformat}** format cannot be read via --backend pandas or polars. Use --backend rdflib")

if backend == 'pandas':
return read_with_pandas(data_path, read_only_few, sample_triples_ratio)
return read_with_pandas(data_path, read_only_few, sample_triples_ratio, separator)
elif backend == 'polars':
return read_with_polars(data_path, read_only_few, sample_triples_ratio)
return read_with_polars(data_path, read_only_few, sample_triples_ratio, separator)
elif backend == "rdflib":
# Lazy import
from rdflib import Graph

try:
assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"]
except AssertionError:
raise AssertionError(f"--backend {backend} and dataformat **{dformat}** is not matching. "
f"Use --backend pandas")
assert dformat in ["ttl", "owl", "nt", "turtle", "rdf/xml", "n3", " n-triples"],\
f"--backend {backend} and dataformat **{dformat}** is not matching. Use --backend pandas"
return pd.DataFrame(data=[(str(s), str(p), str(o)) for s, p, o in Graph().parse(data_path)],
columns=['subject', 'relation', 'object'], dtype=str)
else:
Expand Down
17 changes: 4 additions & 13 deletions dicee/sanity_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,11 @@ def validate_knowledge_graph(args):


def sanity_checking_with_arguments(args):
try:
assert args.embedding_dim > 0
except AssertionError:
raise AssertionError(f'embedding_dim must be strictly positive. Currently:{args.embedding_dim}')

if args.scoring_technique not in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll", "Pyke", "Sentence"]:
raise KeyError(f'Invalid training strategy => {args.scoring_technique}.')

assert args.learning_rate > 0
assert args.embedding_dim > 0,f"embedding_dim must be strictly positive. Currently:{args.embedding_dim}"
assert args.scoring_technique in ["AllvsAll", "1vsSample", "KvsSample","KvsAll", "NegSample", "1vsAll","Pyke", "Sentence"], f"Invalid training strategy => {args.scoring_technique}."
assert args.learning_rate > 0, f"Learning rate must be greater than 0. Currently:{args.learning_rate}"
if args.num_folds_for_cv is None:
args.num_folds_for_cv = 0
try:
assert args.num_folds_for_cv >= 0
except AssertionError:
raise AssertionError(f'num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}')
assert args.num_folds_for_cv >= 0,f"num_folds_for_cv can not be negative. Currently:{args.num_folds_for_cv}"
validate_knowledge_graph(args)

Loading

0 comments on commit 72e9508

Please sign in to comment.