Skip to content

Commit

Permalink
prox fix log files
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Oct 30, 2024
1 parent d3c4291 commit f96e3fd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 118 deletions.
216 changes: 102 additions & 114 deletions oc_meta/run/fixer/prov/fix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import os
import re
import zipfile
from collections import defaultdict
from dataclasses import dataclass, field
import logging
from datetime import datetime, timezone
from multiprocessing import cpu_count
from typing import Dict, List, Optional, Set, Tuple
from typing import Optional, List, Tuple
from zoneinfo import ZoneInfo

from pebble import ProcessPool
Expand All @@ -16,21 +15,23 @@

PROV = Namespace("http://www.w3.org/ns/prov#")

@dataclass
class SnapshotInfo:
uri: URIRef
number: int
generation_times: List[Literal]
invalidation_times: List[Literal]

@dataclass
class ModificationTracker:
modifications: Dict[str, Dict[str, List[str]]] = field(default_factory=lambda: {})
def setup_logging(log_dir: str) -> None:
"""Setup logging configuration for both processing and modifications logs."""
os.makedirs(log_dir, exist_ok=True)

# Setup processing logger
process_logger = logging.getLogger('processing')
process_logger.setLevel(logging.INFO)
process_handler = logging.FileHandler(os.path.join(log_dir, 'processing.log'))
process_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
process_logger.addHandler(process_handler)

def add_modification(self, entity_uri: str, mod_type: str, message: str) -> None:
if entity_uri not in self.modifications:
self.modifications[entity_uri] = defaultdict(list)
self.modifications[entity_uri][mod_type].append(message)
# Setup modifications logger
mod_logger = logging.getLogger('modifications')
mod_logger.setLevel(logging.INFO)
mod_handler = logging.FileHandler(os.path.join(log_dir, 'modifications.log'))
mod_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
mod_logger.addHandler(mod_handler)

class ProvenanceProcessor:
def __init__(self):
Expand All @@ -39,8 +40,12 @@ def __init__(self):
datetime(2022, 12, 20, tzinfo=timezone.utc).isoformat(),
datatype=XSD.dateTime
)
self.tracker = ModificationTracker()
self.logger = logging.getLogger('modifications')

def _log_modification(self, entity_uri: str, mod_type: str, message: str) -> None:
"""Log a modification in real-time."""
self.logger.info(f"Entity: {entity_uri} - {mod_type} - {message}")

def _extract_snapshot_number(self, snapshot_uri: str) -> int:
"""Extract the snapshot number from its URI using pre-compiled regex."""
match = self._snapshot_number_pattern.search(str(snapshot_uri))
Expand All @@ -67,30 +72,26 @@ def _normalize_timestamp(self, literal: Literal) -> Tuple[Literal, bool]:
new_literal = Literal(dt.isoformat(), datatype=XSD.dateTime)
return new_literal, str(new_literal) != str(literal)
except Exception as e:
print(f"Error normalizing timestamp {literal}: {e}")
self.logger.error(f"Error normalizing timestamp {literal}: {e}")
return literal, False

def _collect_snapshot_info(self, context: ConjunctiveGraph) -> List[SnapshotInfo]:
def _collect_snapshot_info(self, context: ConjunctiveGraph) -> List[dict]:
"""Collect all snapshot information in a single pass."""
snapshots = []
seen_uris: Set[str] = set()
seen_uris = set()

for s, p, o in context.triples((None, None, None)):
if '/prov/se/' in str(s) and str(s) not in seen_uris:
generation_times = list(context.objects(s, PROV.generatedAtTime, unique=True))
invalidation_times = list(context.objects(s, PROV.invalidatedAtTime, unique=True))

snapshot = SnapshotInfo(
uri=s,
number=self._extract_snapshot_number(s),
generation_times=generation_times,
invalidation_times=invalidation_times
)

snapshot = {
'uri': s,
'number': self._extract_snapshot_number(s),
'generation_times': list(context.objects(s, PROV.generatedAtTime, unique=True)),
'invalidation_times': list(context.objects(s, PROV.invalidatedAtTime, unique=True))
}
snapshots.append(snapshot)
seen_uris.add(str(s))

return sorted(snapshots, key=lambda x: x.number)
return sorted(snapshots, key=lambda x: x['number'])

def _remove_multiple_timestamps(self, context: ConjunctiveGraph,
snapshot_uri: URIRef,
Expand All @@ -99,20 +100,21 @@ def _remove_multiple_timestamps(self, context: ConjunctiveGraph,
"""Remove all timestamps for a given predicate."""
for ts in timestamps:
context.remove((snapshot_uri, predicate, ts))
self.tracker.add_modification(
self._log_modification(
str(snapshot_uri),
f"Removed {predicate.split('#')[-1]}",
f"{str(snapshot_uri)}: {str(ts)}"
f"{str(ts)}"
)

@staticmethod
def process_file(prov_file_path: str) -> Optional[Tuple[str, Dict]]:
def process_file(prov_file_path: str) -> Optional[bool]:
"""Process a single provenance file with optimized operations."""
processor = ProvenanceProcessor() # Create new instance for each file
try:
process_logger = logging.getLogger('processing')
processor = ProvenanceProcessor()

try:
with zipfile.ZipFile(prov_file_path, 'r') as zip_ref:
g = ConjunctiveGraph()

for filename in zip_ref.namelist():
with zip_ref.open(filename) as file:
g.parse(file, format='json-ld')
Expand All @@ -136,26 +138,27 @@ def process_file(prov_file_path: str) -> Optional[Tuple[str, Dict]]:
ensure_ascii=False, indent=None)
zip_out.writestr('se.json', jsonld_data)

return str(prov_file_path), processor.tracker.modifications
return True

return False

except Exception as e:
print(f"Error processing {prov_file_path}: {e}")

return None
process_logger.error(f"Error processing {prov_file_path}: {e}")
return None

def _process_snapshots(self, context: ConjunctiveGraph, entity_uri: URIRef,
snapshots: List[SnapshotInfo]) -> bool:
snapshots: List[dict]) -> bool:
"""Process all snapshots in batch operations."""
modified = False

# Process specializationOf relationships
for snapshot in snapshots:
if not any(context.objects(snapshot.uri, PROV.specializationOf)):
context.add((snapshot.uri, PROV.specializationOf, entity_uri))
self.tracker.add_modification(
if not any(context.objects(snapshot['uri'], PROV.specializationOf)):
context.add((snapshot['uri'], PROV.specializationOf, entity_uri))
self._log_modification(
str(entity_uri),
"Added specializationOf",
str(snapshot.uri)
str(snapshot['uri'])
)
modified = True

Expand All @@ -164,12 +167,12 @@ def _process_snapshots(self, context: ConjunctiveGraph, entity_uri: URIRef,
curr_snapshot = snapshots[i]
prev_snapshot = snapshots[i-1]

if not any(context.objects(curr_snapshot.uri, PROV.wasDerivedFrom)):
context.add((curr_snapshot.uri, PROV.wasDerivedFrom, prev_snapshot.uri))
self.tracker.add_modification(
if not any(context.objects(curr_snapshot['uri'], PROV.wasDerivedFrom)):
context.add((curr_snapshot['uri'], PROV.wasDerivedFrom, prev_snapshot['uri']))
self._log_modification(
str(entity_uri),
"Added wasDerivedFrom",
f"{str(curr_snapshot.uri)}{str(prev_snapshot.uri)}"
f"{str(curr_snapshot['uri'])}{str(prev_snapshot['uri'])}"
)
modified = True

Expand All @@ -178,8 +181,7 @@ def _process_snapshots(self, context: ConjunctiveGraph, entity_uri: URIRef,

return modified

def _process_timestamps(self, context: ConjunctiveGraph,
snapshots: List[SnapshotInfo]) -> bool:
def _process_timestamps(self, context: ConjunctiveGraph, snapshots: List[dict]) -> bool:
"""Process all timestamps in batch."""
modified = False

Expand All @@ -194,73 +196,76 @@ def _process_timestamps(self, context: ConjunctiveGraph,
return modified

def _handle_generation_time(self, context: ConjunctiveGraph,
snapshots: List[SnapshotInfo], index: int) -> bool:
snapshots: List[dict], index: int) -> bool:
"""Handle generation time for a snapshot."""
modified = False
snapshot = snapshots[index]

if len(snapshot.generation_times) != 1:
if len(snapshot['generation_times']) != 1:
new_time = None

if snapshot.generation_times:
if snapshot['generation_times']:
self._remove_multiple_timestamps(
context, snapshot.uri, PROV.generatedAtTime, snapshot.generation_times)
context, snapshot['uri'], PROV.generatedAtTime, snapshot['generation_times'])
modified = True

if index > 0:
prev_snapshot = snapshots[index-1]
if prev_snapshot.invalidation_times:
new_time = prev_snapshot.invalidation_times[0]
elif (prev_snapshot.generation_times and
snapshot.invalidation_times and len(snapshot.invalidation_times) == 1):
prev_time = self._convert_to_utc(prev_snapshot.generation_times[0])
curr_time = self._convert_to_utc(snapshot.invalidation_times[0])
if prev_snapshot['invalidation_times']:
new_time = prev_snapshot['invalidation_times'][0]
elif (prev_snapshot['generation_times'] and
snapshot['invalidation_times'] and
len(snapshot['invalidation_times']) == 1):
prev_time = self._convert_to_utc(prev_snapshot['generation_times'][0])
curr_time = self._convert_to_utc(snapshot['invalidation_times'][0])
middle_time = prev_time + (curr_time - prev_time) / 2
new_time = Literal(middle_time.isoformat(), datatype=XSD.dateTime)
else:
new_time = self._default_time

if new_time:
context.add((snapshot.uri, PROV.generatedAtTime, new_time))
self.tracker.add_modification(
str(snapshot.uri),
context.add((snapshot['uri'], PROV.generatedAtTime, new_time))
self._log_modification(
str(snapshot['uri']),
"Added generatedAtTime",
f"{str(snapshot.uri)}: {str(new_time)}"
f"{str(new_time)}"
)
modified = True

return modified

def _handle_invalidation_time(self, context: ConjunctiveGraph,
snapshots: List[SnapshotInfo], index: int) -> bool:
snapshots: List[dict], index: int) -> bool:
"""Handle invalidation time for a snapshot."""
modified = False
snapshot = snapshots[index]
next_snapshot = snapshots[index + 1]

if len(snapshot.invalidation_times) != 1:
if snapshot.invalidation_times:
if len(snapshot['invalidation_times']) != 1:
if snapshot['invalidation_times']:
self._remove_multiple_timestamps(
context, snapshot.uri, PROV.invalidatedAtTime, snapshot.invalidation_times)
context, snapshot['uri'], PROV.invalidatedAtTime,
snapshot['invalidation_times']
)
modified = True

new_time = None
if next_snapshot.generation_times:
if len(next_snapshot.generation_times) == 1:
new_time = next_snapshot.generation_times[0]
if next_snapshot['generation_times']:
if len(next_snapshot['generation_times']) == 1:
new_time = next_snapshot['generation_times'][0]
else:
earliest_time = min(
self._convert_to_utc(ts)
for ts in next_snapshot.generation_times
for ts in next_snapshot['generation_times']
)
new_time = Literal(earliest_time.isoformat(), datatype=XSD.dateTime)

if new_time:
context.add((snapshot.uri, PROV.invalidatedAtTime, new_time))
self.tracker.add_modification(
str(snapshot.uri),
context.add((snapshot['uri'], PROV.invalidatedAtTime, new_time))
self._log_modification(
str(snapshot['uri']),
"Added invalidatedAtTime",
f"{str(snapshot.uri)}: {str(new_time)}"
f"{str(new_time)}"
)
modified = True

Expand All @@ -271,77 +276,60 @@ def process_chunk(file_chunk):
for file in file_chunk:
result = ProvenanceProcessor.process_file(file)
if result:
results.append(result)
results.append(file)
return results

def main():
parser = argparse.ArgumentParser(description="Fix provenance files in parallel")
parser.add_argument('input_dir', type=str, help="Directory containing provenance files")
parser.add_argument('--processes', type=int, default=cpu_count(),
help="Number of parallel processes (default: number of CPU cores)")
parser.add_argument('--log-dir', type=str, default='logs',
help="Directory for log files (default: logs)")
args = parser.parse_args()

setup_logging(args.log_dir)
process_logger = logging.getLogger('processing')

process_logger.info("Starting provenance fix process")

prov_files = [
os.path.join(root, file)
for root, _, files in os.walk(args.input_dir)
for file in files
if file.endswith('se.zip')
]

# Create chunks of files to reduce concurrent I/O
chunk_size = 100 # Adjust based on your system
chunk_size = 100
file_chunks = [prov_files[i:i + chunk_size]
for i in range(0, len(prov_files), chunk_size)]

all_results = []
with ProcessPool(max_workers=args.processes) as pool:
modified_files = []
with ProcessPool(max_workers=args.processes, max_tasks=1) as pool:
future = pool.map(process_chunk, file_chunks)
iterator = future.result()

# Setup progress bar
with tqdm(total=len(file_chunks), desc="Fixing provenance files") as pbar:
try:
while True:
try:
result = next(iterator)
all_results.extend(result)
modified_files.extend(result)
pbar.update(1)
except StopIteration:
break
except TimeoutError as error:
print(f"\nChunk processing timed out: {error}")
process_logger.error(f"Chunk processing timed out: {error}")
except Exception as error:
print(f"\nError processing chunk: {error}")
process_logger.error(f"Error processing chunk: {error}")
except KeyboardInterrupt:
print("\nProcessing interrupted by user")
process_logger.warning("Processing interrupted by user")
future.cancel()
raise

print("\nProvenance Fix Report")
print("=" * 80)

modified_files = sum(1 for result in all_results if result is not None)

if modified_files == 0:
print("\nNo modifications were necessary in any file.")
else:
for result in all_results:
if result:
file_path, entity_mods = result
if any(mod_list for mods in entity_mods.values() for mod_list in mods.values()):
print(f"\nFile: {file_path}")
print("-" * 80)

for entity_uri, modifications in entity_mods.items():
if any(mod_list for mod_list in modifications.values()):
print(f"\nEntity: {entity_uri}")
for mod_type, mod_list in modifications.items():
if mod_list:
print(f"\n {mod_type}:")
for mod in mod_list:
print(f" - {mod}")

print(f"\nTotal files modified: {modified_files}")
process_logger.info(f"Process completed. Total files modified: {len(modified_files)}")
print(f"\nProcessing complete. Check logs in {args.log_dir} directory.")
print(f"Total files modified: {len(modified_files)}")

if __name__ == "__main__":
main()
Loading

0 comments on commit f96e3fd

Please sign in to comment.