Skip to content

Commit

Permalink
entity_merger test
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Nov 19, 2024
1 parent 141c10f commit a0d4244
Show file tree
Hide file tree
Showing 10 changed files with 1,261 additions and 89 deletions.
210 changes: 126 additions & 84 deletions oc_meta/run/merge/duplicated_entities_simultaneously.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,106 +2,148 @@
import concurrent.futures
import csv
import os
from typing import List
from typing import List, Dict

from oc_meta.plugins.editor import MetaEditor
from oc_ocdm.graph import GraphSet
from rdflib import URIRef
from tqdm import tqdm


def get_entity_type(entity_url):
parts = entity_url.split('/')
if 'oc' in parts and 'meta' in parts:
try:
return parts[parts.index('meta') + 1]
except IndexError:
return None
return None

def read_csv(csv_file) -> List[dict]:
data = []
with open(csv_file, mode='r', newline='', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
if 'Done' not in row:
row['Done'] = 'False'
data.append(row)
return data

def write_csv(csv_file, data):
fieldnames = data[0].keys()
with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)

def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path):
data = read_csv(csv_file)
meta_editor = MetaEditor(meta_config, resp_agent, save_queries=True)

# Creiamo un unico GraphSet per tutte le operazioni
g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler)
modified = False

for row in data:
if os.path.exists(stop_file_path):
break
class EntityMerger:
def __init__(self, meta_config: str, resp_agent: str, entity_types: List[str], stop_file_path: str, workers: int):
self.meta_config = meta_config
self.resp_agent = resp_agent
self.entity_types = entity_types
self.stop_file_path = stop_file_path
self.workers = workers

entity_type = get_entity_type(row['surviving_entity'])
if row.get('Done') != 'True' and entity_type in entity_types:
surviving_entity = URIRef(row['surviving_entity'])
merged_entities = row['merged_entities'].split('; ')
@staticmethod
def get_entity_type(entity_url: str) -> str:
parts = entity_url.split('/')
if 'oc' in parts and 'meta' in parts:
try:
return parts[parts.index('meta') + 1]
except IndexError:
return None
return None

@staticmethod
def read_csv(csv_file: str) -> List[Dict]:
data = []
with open(csv_file, mode='r', newline='', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
if 'Done' not in row:
row['Done'] = 'False'
data.append(row)
return data

@staticmethod
def write_csv(csv_file: str, data: List[Dict]):
fieldnames = data[0].keys()
with open(csv_file, mode='w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)

@staticmethod
def count_csv_rows(csv_file: str) -> int:
with open(csv_file, 'r', encoding='utf-8') as f:
return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row

def process_file(self, csv_file: str) -> str:
"""Process a single CSV file with its own MetaEditor instance"""
data = self.read_csv(csv_file)
meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True)
modified = False

# Create a GraphSet for the current file
g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler)

for merged_entity in merged_entities:
merged_entity = merged_entity.strip()
for row in data:
if os.path.exists(self.stop_file_path):
break

entity_type = self.get_entity_type(row['surviving_entity'])
if row.get('Done') != 'True' and entity_type in self.entity_types:
surviving_entity = URIRef(row['surviving_entity'])
merged_entities = row['merged_entities'].split('; ')

for merged_entity in merged_entities:
merged_entity = merged_entity.strip()
try:
meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity))
except ValueError:
continue

row['Done'] = 'True'
modified = True
if modified:
meta_editor.save(g_set)
self.write_csv(csv_file, data)

return csv_file

def process_folder(self, csv_folder: str):
"""Process all CSV files in a folder using parallel processing"""
if os.path.exists(self.stop_file_path):
os.remove(self.stop_file_path)

csv_files = [os.path.join(csv_folder, file)
for file in os.listdir(csv_folder)
if file.endswith('.csv')]

# Filter CSV files based on number of rows and workers
if self.workers > 4:
csv_files = [file for file in csv_files
if self.count_csv_rows(file) <= 10000]

with concurrent.futures.ProcessPoolExecutor(max_workers=self.workers) as executor:
futures = {
executor.submit(self.process_file, csv_file): csv_file
for csv_file in csv_files
}

for future in tqdm(concurrent.futures.as_completed(futures),
total=len(futures),
desc="Overall Progress"):
try:
meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity))
except ValueError:
continue

row['Done'] = 'True'
modified = True
future.result()
except Exception as e:
print(f"Error processing file: {e}")

# Salviamo le modifiche una sola volta alla fine
if modified:
meta_editor.save(g_set)

write_csv(csv_file, data)
return csv_file

def count_csv_rows(csv_file):
with open(csv_file, 'r', encoding='utf-8') as f:
return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row

def main():
parser = argparse.ArgumentParser(description="Merge entities from CSV files in a folder.")
parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files")
parser.add_argument('meta_config', type=str, help="Meta configuration string")
parser.add_argument('resp_agent', type=str, help="Responsible agent string")
parser.add_argument('--entity_types', nargs='+', default=['ra', 'br', 'id'], help="Types of entities to merge (ra, br, id)")
parser.add_argument('--stop_file', type=str, default="stop.out", help="Path to the stop file")
parser.add_argument('--workers', type=int, default=4, help="Number of parallel workers")
parser.add_argument('csv_folder', type=str,
help="Path to the folder containing CSV files")
parser.add_argument('meta_config', type=str,
help="Meta configuration string")
parser.add_argument('resp_agent', type=str,
help="Responsible agent string")
parser.add_argument('--entity_types', nargs='+',
default=['ra', 'br', 'id'],
help="Types of entities to merge (ra, br, id)")
parser.add_argument('--stop_file', type=str,
default="stop.out",
help="Path to the stop file")
parser.add_argument('--workers', type=int,
default=4,
help="Number of parallel workers")

args = parser.parse_args()

if os.path.exists(args.stop_file):
os.remove(args.stop_file)

csv_files = [os.path.join(args.csv_folder, file) for file in os.listdir(args.csv_folder) if file.endswith('.csv')]

# Filtrare i file CSV in base al numero di righe e al numero di workers
if args.workers > 4:
csv_files = [file for file in csv_files if count_csv_rows(file) <= 10000]

with concurrent.futures.ProcessPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(process_file, csv_file, args.meta_config, args.resp_agent, args.entity_types, args.stop_file): csv_file for csv_file in csv_files}

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Overall Progress"):
try:
future.result()
except Exception as e:
print(f"Error processing file: {e}")
merger = EntityMerger(
meta_config=args.meta_config,
resp_agent=args.resp_agent,
entity_types=args.entity_types,
stop_file_path=args.stop_file,
workers=args.workers
)

merger.process_folder(args.csv_folder)


if __name__ == "__main__":
Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ oc-ds-converter = "^1.0.4"
ijson = "^3.2.3"
internetarchive = "^3.7.0"
zenodopy = "^0.3.0"
oc-ocdm = "^9.1.2"
oc-ocdm = "9.2.0"
retrying = "^1.3.4"
orjson = "^3.10.7"
rdflib-ocdm = "0.3.11"
Expand Down
Loading

0 comments on commit a0d4244

Please sign in to comment.