Skip to content

Commit

Permalink
multiple snapshots missing
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Nov 1, 2024
1 parent 34ba540 commit 995291b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 30 deletions.
45 changes: 31 additions & 14 deletions oc_meta/run/fixer/prov/fix.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _collect_snapshot_info(self, context: ConjunctiveGraph) -> List[dict]:
return sorted_snapshots

def _fill_missing_snapshots(self, context: ConjunctiveGraph,
snapshots: List[dict], base_uri: str) -> List[dict]:
snapshots: List[dict], base_uri: str) -> List[dict]:
"""Fill in missing snapshots in the sequence."""
if not snapshots:
return snapshots
Expand All @@ -145,47 +145,64 @@ def _fill_missing_snapshots(self, context: ConjunctiveGraph,
max_num = max(s['number'] for s in snapshots)
min_num = min(s['number'] for s in snapshots)
existing_numbers = {s['number'] for s in snapshots}
existing_snapshots = {s['number']: s for s in snapshots}

for i in range(min_num, max_num + 1):
if i in existing_numbers:
filled_snapshots.append(next(s for s in snapshots if s['number'] == i))
filled_snapshots.append(existing_snapshots[i])
else:
# Create missing snapshot
missing_uri = URIRef(f"{base_uri}/se/{i}")

# Trova il primo snapshot precedente disponibile
prev_num = i - 1
while prev_num >= min_num and prev_num not in existing_numbers:
prev_num -= 1
prev_snapshot = existing_snapshots.get(prev_num)

# Trova il primo snapshot successivo disponibile
next_num = i + 1
while next_num <= max_num and next_num not in existing_numbers:
next_num += 1
next_snapshot = existing_snapshots.get(next_num)

missing_snapshot = self._create_missing_snapshot(
context, missing_uri, i,
next(s for s in snapshots if s['number'] == i-1),
next(s for s in snapshots if s['number'] == i+1)
prev_snapshot, next_snapshot
)
filled_snapshots.append(missing_snapshot)

return sorted(filled_snapshots, key=lambda x: x['number'])

def _create_missing_snapshot(self, context: ConjunctiveGraph,
missing_uri: URIRef, number: int,
prev_snapshot: dict, next_snapshot: dict) -> dict:
missing_uri: URIRef, number: int,
prev_snapshot: Optional[dict],
next_snapshot: Optional[dict]) -> dict:
"""Create a missing snapshot with basic information."""
entity_uri = URIRef(self._get_entity_from_prov_graph(str(missing_uri.split('se')[0])))

entity_uri = URIRef(self._get_entity_from_prov_graph(str(missing_uri).split('se')[0]))
# Add basic triples for the missing snapshot
context.add((missing_uri, RDF.type, PROV.Entity))
context.add((missing_uri, PROV.specializationOf, entity_uri))
context.add((missing_uri, PROV.wasDerivedFrom, prev_snapshot['uri']))

# Add wasDerivedFrom if we have a previous snapshot
if prev_snapshot:
context.add((missing_uri, PROV.wasDerivedFrom, prev_snapshot['uri']))

generation_time = None
invalidation_time = None

# Try to infer timestamps
if prev_snapshot['invalidation_times']:
if prev_snapshot and prev_snapshot['invalidation_times']:
generation_time = prev_snapshot['invalidation_times'][0]
elif prev_snapshot['generation_times'] and next_snapshot['generation_times']:
elif prev_snapshot and prev_snapshot['generation_times'] and next_snapshot and next_snapshot['generation_times']:
# Calculate a time between prev generation and next generation
prev_time = self._convert_to_utc(prev_snapshot['generation_times'][0])
next_time = self._convert_to_utc(next_snapshot['generation_times'][0])
middle_time = prev_time + (next_time - prev_time) / 2
generation_time = Literal(middle_time.isoformat(), datatype=XSD.dateTime)
if next_snapshot['generation_times']:

if next_snapshot and next_snapshot['generation_times']:
invalidation_time = next_snapshot['generation_times'][0]

# Add timestamps if we could infer them
Expand Down
80 changes: 64 additions & 16 deletions test/fix_provenance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,23 +443,71 @@ def test_multiple_missing_snapshots(self):
fixed_data = json.loads(f.read())

graph_data = fixed_data[0]['@graph']

# Verify all snapshots exist
snapshot_ids = {item['@id'] for item in graph_data}
expected_ids = {
f"https://w3id.org/oc/meta/br/06504122264/prov/se/{i}"
for i in range(1, 6)
}
self.assertEqual(snapshot_ids, expected_ids)

# Verify the chain of wasDerivedFrom relationships
for i in range(2, 6):
curr_snapshot = next(item for item in graph_data
if item['@id'].endswith(f'/se/{i}'))
self.assertIn('http://www.w3.org/ns/prov#wasDerivedFrom', curr_snapshot)
print(json.dumps(fixed_data, indent=4))
# Raccoglie gli snapshot e i loro numeri
snapshots = {}
for item in graph_data:
if '/prov/se/' in item['@id']:
num = int(item['@id'].split('/se/')[-1])
snapshots[num] = item

# Verifica che tutti gli snapshot abbiano le proprietà di base
for num, snapshot in snapshots.items():
# Verifica tipo
self.assertIn('@type', snapshot)
self.assertIn('http://www.w3.org/ns/prov#Entity', snapshot['@type'])

# Verifica specializationOf
self.assertIn('http://www.w3.org/ns/prov#specializationOf', snapshot)
self.assertEqual(
snapshot['http://www.w3.org/ns/prov#specializationOf'][0]['@id'],
"https://w3id.org/oc/meta/br/06504122264"
)

# Verifica timestamp
self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', snapshot)
gen_time = snapshot['http://www.w3.org/ns/prov#generatedAtTime'][0]['@value']
self.assertTrue('+00:00' in gen_time or 'Z' in gen_time)

# Verifica wasDerivedFrom per tutti tranne il primo snapshot
if num > min(snapshots.keys()):
self.assertIn('http://www.w3.org/ns/prov#wasDerivedFrom', snapshot)

# Verifica la consistenza temporale
ordered_nums = sorted(snapshots.keys())
for i in range(len(ordered_nums)-1):
curr_num = ordered_nums[i]
next_num = ordered_nums[i+1]

curr_snapshot = snapshots[curr_num]
next_snapshot = snapshots[next_num]

# Se lo snapshot corrente ha un tempo di invalidazione
if 'http://www.w3.org/ns/prov#invalidatedAtTime' in curr_snapshot:
curr_inv_time = self.processor._convert_to_utc(
curr_snapshot['http://www.w3.org/ns/prov#invalidatedAtTime'][0]['@value']
)
next_gen_time = self.processor._convert_to_utc(
next_snapshot['http://www.w3.org/ns/prov#generatedAtTime'][0]['@value']
)
self.assertEqual(
curr_inv_time,
next_gen_time,
f"Invalidation time of snapshot {curr_num} should match generation time of {next_num}"
)

# Verifica che gli snapshot siano collegati correttamente
for num in ordered_nums[1:]: # Skip the first one
curr_snapshot = snapshots[num]
prev_num = ordered_nums[ordered_nums.index(num) - 1]

# Verifica che wasDerivedFrom punti allo snapshot precedente
derived_from = curr_snapshot['http://www.w3.org/ns/prov#wasDerivedFrom'][0]['@id']
expected_derived = f"https://w3id.org/oc/meta/br/06504122264/prov/se/{prev_num}"
self.assertEqual(
curr_snapshot['http://www.w3.org/ns/prov#wasDerivedFrom'][0]['@id'],
f"https://w3id.org/oc/meta/br/06504122264/prov/se/{i-1}"
derived_from,
expected_derived,
f"Snapshot {num} should be derived from snapshot {prev_num}"
)

def test_timestamp_inference(self):
Expand Down

0 comments on commit 995291b

Please sign in to comment.