Skip to content

Commit

Permalink
Connect provenance through nipype nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
omar-rifai committed Feb 23, 2022
1 parent 5259a06 commit ce07366
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 203 deletions.
3 changes: 3 additions & 0 deletions clinica/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
from .cmdparser import CmdParser
from nipype import config

config.enable_debug_mode()
152 changes: 96 additions & 56 deletions clinica/engine/prov_utils.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,59 @@
from pathlib import Path
from typing import List, Optional
from clinica.engine.prov_model import *

from clinica.utils.input_files import pet_linear_nii

from .prov_model import *
def mint_agent() -> ProvAgent:
"""
return
ProvAgent associated with running version of the software
"""
from clinica import __name__, __version__
from clinica.engine.prov_utils import generate_agent_id

new_agent = ProvAgent(uid=generate_agent_id())

new_agent.attributes["version"] = __version__
new_agent.attributes["label"] = __name__

return new_agent


def get_files_list(
self, pipeline_fullname: str, dict_field="input_to", pipeline_args={}
) -> List[Path]:
def mint_activity(agent: Identifier, entities: List[ProvEntity]) -> ProvActivity:
"""
params:
pipeline_fullname: the current running pipeline name
dict_field: variable to specify if fetching inputs or outputs to the pipeline
return
ProvActivity from related entities and associated agent
"""
import sys

from clinica.engine.prov_utils import generate_activity_id

new_activity = ProvActivity(uid=generate_activity_id("testfullname"))

return list of 'Path's to the files used in the pipeline
new_activity.attributes["parameters"] = "testparameters"
new_activity.attributes["label"] = "testfullname"
new_activity.attributes["command"] = sys.argv[1:]
new_activity.attributes["used"] = [str(x.uid) for x in entities]
new_activity.attributes["wasAssociatedWith"] = str(agent.uid)

return new_activity


def mint_entity(path_curr: Path) -> ProvEntity:
"""
return an Entity object from the file in path_curr
"""
import clinica.utils.input_files as cif
from clinica.utils.input_files import pet_linear_nii
from clinica.utils.inputs import clinica_file_reader

funcs = {"pet-linear": pet_linear_nii}
from clinica.engine.prov_utils import generate_entity_id, get_last_activity

dict_field_options = ["input_to", "output_from"]
if dict_field not in dict_field_options:
raise (f"dict_field must be one of {dict_field_options}")
new_entity = ProvEntity(uid=generate_entity_id(path_curr))
new_entity.attributes["label"] = path_curr.name
new_entity.attributes["path"] = str(path_curr)

# Retrieve all the data dict from the input_files module
# TODO: implement function to return the latest associated activity
new_entity.attributes["wasGeneratedBy"] = get_last_activity(path_curr)

if pipeline_fullname in funcs and dict_field == "output_from":
files_dicts = {
"PET": funcs[pipeline_fullname](
**clean_arguments(pipeline_args, funcs[pipeline_fullname])
)
}
else:
files_dicts = {
k: v
for k, v in vars(cif).items()
if isinstance(v, dict)
and dict_field in v.keys()
and pipeline_fullname in v[dict_field]
}
# TODO: check if bids or caps as output

ret_files = []
for elem in files_dicts:
ref_dir = (
self.bids_directory if dict_field == "input_to" else self.caps_directory
)
current_file, _ = clinica_file_reader(
self.subjects,
self.sessions,
ref_dir,
files_dicts[elem],
raise_exception=True,
)
if current_file:
ret_files.extend([Path(x) for x in current_file])

return ret_files
return new_entity


def generate_entity_id(path_file: Path) -> Identifier:
Expand Down Expand Up @@ -97,12 +91,13 @@ def get_path_prov(path_entity: Path) -> Path:
"""
return: Path of the provenance file associated with an entity
"""

while path_entity.suffix != "":
path_entity = path_entity.with_suffix("")

path_prov = path_entity.with_suffix(".jsonld")
return path_prov
if path_entity.is_file():
while path_entity.suffix != "":
path_entity = path_entity.with_suffix("")
path_prov = path_entity.with_suffix(".jsonld")
return path_prov
else:
return None


def create_prov_file(prov_command, prov_path):
Expand All @@ -122,7 +117,7 @@ def read_prov_jsonld(path_prov: Path) -> Optional[ProvRecord]:
return: ProvRecord in a specific location stored in jsonld format
"""

if path_prov.exists():
if path_prov and path_prov.exists():
prov_record = deserialize_jsonld(path_prov)
return prov_record

Expand Down Expand Up @@ -183,3 +178,48 @@ def clean_arguments(pipeline_args, file_func):
if key not in argspec.args:
del pipeline_args[key]
return pipeline_args


def validate_command(prov_history: ProvRecord, prov_current: ProvRecord) -> bool:
"""
Check the command is valid on the data being run
"""
flag = True

for a in prov_history.elements:
for b in prov_current.elements:
# TODO: check that the record entries are compatible with the current entry
flag = True
return flag


def is_valid(command: dict) -> bool:
valid_list = [
{
("clin:clinica0.5.0", "clin:adni2Bids"): (
"clin:clinica0.5.0",
"clin:t1-linear",
)
}
]
if command in valid_list:
return True
return False


def write_prov_file(
list_prov_entries: ProvRecord, path_entity: Path, overwrite=False
) -> None:
"""
Create provenance file with current pipeline information
params:
prov_entries: list of ProvEntry
entity_path: path of the prov-associated element
"""

prov_path = get_path_prov(path_entity)

create_prov_file(list_prov_entries, prov_path)

return
Loading

0 comments on commit ce07366

Please sign in to comment.