Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that ProcessNode.get_builder_restart fully restores all inputs including metadata inputs #5801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 4 additions & 22 deletions aiida/engine/processes/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from aiida.orm import Dict, Node
from aiida.orm.nodes.data.base import BaseType

from .utils import prune_mapping

if TYPE_CHECKING:
from aiida.engine.processes.process import Process

Expand Down Expand Up @@ -162,7 +164,7 @@ def __delattr__(self, item):

def _recursive_merge(self, dictionary, key, value):
"""Recursively merge the contents of ``dictionary`` setting its ``key`` to ``value``."""
if isinstance(value, Mapping):
if isinstance(value, Mapping) and key in dictionary:
for inner_key, inner_value in value.items():
self._recursive_merge(dictionary[key], inner_key, inner_value)
else:
Expand Down Expand Up @@ -219,30 +221,10 @@ def _inputs(self, prune: bool = False) -> dict:
:return: mapping of inputs ports and their input values.
"""
if prune:
return self._prune(dict(self))
return prune_mapping(dict(self))

return dict(self)

def _prune(self, value):
"""Prune a nested mapping from all mappings that are completely empty.

.. note:: a nested mapping that is completely empty means it contains at most other empty mappings. Other null
values, such as `None` or empty lists, should not be pruned.

:param value: a nested mapping of port values
:return: the same mapping but without any nested namespace that is completely empty.
"""
if isinstance(value, Mapping) and not isinstance(value, Node):
result = {}
for key, sub_value in value.items():
pruned = self._prune(sub_value)
# If `pruned` is an "empty'ish" mapping and not an instance of `Node`, skip it, otherwise keep it.
if not (isinstance(pruned, Mapping) and not pruned and not isinstance(pruned, Node)):
result[key] = pruned
return result

return value


class ProcessBuilder(ProcessBuilderNamespace): # pylint: disable=too-many-ancestors
"""A process builder that helps setting up the inputs for creating a new process."""
Expand Down
4 changes: 2 additions & 2 deletions aiida/engine/processes/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ def run(self) -> Optional['ExitCode']:

for name, value in (self.inputs or {}).items():
try:
if self.spec().inputs[name].non_db: # type: ignore[union-attr]
# Don't consider non-database inputs
if self.spec().inputs[name].is_metadata: # type: ignore[union-attr]
# Don't consider ports that defined ``is_metadata=True``
continue
except KeyError:
pass # No port found
Expand Down
74 changes: 57 additions & 17 deletions aiida/engine/processes/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@


class WithNonDb:
"""
A mixin that adds support to a port to flag that it should not be stored
in the database using the non_db=True flag.

The mixins have to go before the main port class in the superclass order
to make sure the mixin has the chance to strip out the non_db keyword.
"""
"""A mixin that adds support to a port to flag it should not be stored in the database using the ``non_db`` flag."""

def __init__(self, *args, **kwargs) -> None:
self._non_db_explicitly_set: bool = bool('non_db' in kwargs)
Expand All @@ -46,28 +40,66 @@ def __init__(self, *args, **kwargs) -> None:

@property
def non_db_explicitly_set(self) -> bool:
"""Return whether the a value for `non_db` was explicitly passed in the construction of the `Port`.
"""Return whether the ``non_db`` keyword was explicitly passed in the construction of the ``InputPort``.

:return: boolean, True if `non_db` was explicitly defined during construction, False otherwise
:return: ``True`` if ``non_db`` was explicitly defined during construction, ``False`` otherwise
"""
return self._non_db_explicitly_set

@property
def non_db(self) -> bool:
"""Return whether the value of this `Port` should be stored as a `Node` in the database.
"""Return whether the value of this ``InputPort`` should be stored in the database.

:return: boolean, True if it should be storable as a `Node`, False otherwise
:return: ``True`` if it should be stored, ``False`` otherwise
"""
return self._non_db

@non_db.setter
def non_db(self, non_db: bool) -> None:
"""Set whether the value of this `Port` should be stored as a `Node` in the database.
"""
"""Set whether the value of this ``InputPort`` should be stored as a ``Data`` in the database."""
self._non_db_explicitly_set = True
self._non_db = non_db


class WithMetadata:
"""A mixin that allows an input port to be marked as metadata through the keyword ``is_metadata``.

A metadata input differs from a normal input as in that it is not linked to the ``ProcessNode`` as a ``Data`` node
but rather is stored on the ``ProcessNode`` itself (as an attribute, for example).
"""

def __init__(self, *args, **kwargs) -> None:
self._explicitly_set: bool = bool('is_metadata' in kwargs)
is_metadata = kwargs.pop('is_metadata', False)
super().__init__(*args, **kwargs)
ltalirz marked this conversation as resolved.
Show resolved Hide resolved
self._is_metadata: bool = is_metadata

@property
def is_metadata_explicitly_set(self) -> bool:
"""Return whether the ``is_metadata`` keyword was explicitly passed in the construction of the ``InputPort``.

:return: ``True`` if ``is_metadata`` was explicitly defined during construction, ``False`` otherwise
"""
return self._explicitly_set

@property
def is_metadata(self) -> bool:
"""Return whether the value of this ``InputPort`` should be stored as a ``Node`` in the database.

:return: ``True`` if it should be storable as a ``Node``, ``False`` otherwise
"""
return self._is_metadata

@is_metadata.setter
def is_metadata(self, is_metadata: bool) -> None:
"""Set whether the value of this ``InputPort`` should be stored as a ``Node`` in the database.

:param is_metadata: ``False`` if the port value should be linked as a ``Node``, ``True`` otherwise.
"""
self._explicitly_set = True
self._is_metadata = is_metadata


class WithSerialize:
"""
A mixin that adds support for a serialization function which is automatically applied on inputs
Expand All @@ -91,10 +123,13 @@ def serialize(self, value: Any) -> 'Data':
return self._serializer(value)


class InputPort(WithSerialize, WithNonDb, ports.InputPort):
class InputPort(WithMetadata, WithSerialize, WithNonDb, ports.InputPort):
"""
Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic
value serialization to database storable types and support non database storable input types as well.

The mixins have to go before the main port class in the superclass order to make sure they have the chance to
process their specific keywords.
"""

def __init__(self, *args, **kwargs) -> None:
Expand All @@ -121,12 +156,12 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

def get_description(self) -> Dict[str, str]:
"""
Return a description of the InputPort, which will be a dictionary of its attributes
"""Return a description of the InputPort, which will be a dictionary of its attributes

:returns: a dictionary of the stringified InputPort attributes
"""
description = super().get_description()
description['is_metadata'] = f'{self.is_metadata}'
description['non_db'] = f'{self.non_db}'

return description
Expand All @@ -145,7 +180,7 @@ def pass_to_parser(self) -> bool:
return self._pass_to_parser


class PortNamespace(WithNonDb, ports.PortNamespace):
class PortNamespace(WithMetadata, WithNonDb, ports.PortNamespace):
"""
Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive
serialization of a given mapping onto the ports of the PortNamespace.
Expand All @@ -165,6 +200,11 @@ def __setitem__(self, key: str, port: ports.Port) -> None:

self.validate_port_name(key)

if hasattr(
port, 'is_metadata_explicitly_set'
) and not port.is_metadata_explicitly_set: # type: ignore[attr-defined]
port.is_metadata = self.is_metadata # type: ignore[attr-defined]

if hasattr(port, 'non_db_explicitly_set') and not port.non_db_explicitly_set: # type: ignore[attr-defined]
port.non_db = self.non_db # type: ignore[attr-defined]

Expand Down
65 changes: 62 additions & 3 deletions aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
from aiida.common.lang import classproperty, override
from aiida.common.links import LinkType
from aiida.common.log import LOG_LEVEL_REPORT
from aiida.orm.implementation.utils import clean_value
from aiida.orm.utils import serialize

from .builder import ProcessBuilder
from .exit_code import ExitCode, ExitCodesNamespace
from .ports import PORT_NAMESPACE_SEPARATOR, InputPort, OutputPort, PortNamespace
from .process_spec import ProcessSpec
from .utils import prune_mapping

if TYPE_CHECKING:
from aiida.engine.runners import Runner
Expand Down Expand Up @@ -92,7 +94,7 @@ def define(cls, spec: ProcessSpec) -> None: # type: ignore[override]

"""
super().define(spec)
spec.input_namespace(spec.metadata_key, required=False, non_db=True)
spec.input_namespace(spec.metadata_key, required=False, is_metadata=True)
spec.input(
f'{spec.metadata_key}.store_provenance',
valid_type=bool,
Expand Down Expand Up @@ -745,6 +747,16 @@ def _setup_metadata(self, metadata: dict) -> None:
else:
raise RuntimeError(f'unsupported metadata key: {name}')

# Store JSON-serializable values of ``metadata`` ports in the node's attributes. Note that instead of passing in
# the ``metadata`` inputs directly, the entire namespace of raw inputs is passed. The reason is that although
# currently in ``aiida-core`` all input ports with ``is_metadata=True`` in the port specification are located
# within the ``metadata`` port namespace, this may not always be the case. The ``_filter_serializable_metadata``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to mention how you plan to use this rather than just that it could be used.
(or just remember to update this comment when you add your next PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This essentially pertains to my last comment regarding the last remaining open question. Should we allow other plugins to add metadata ports and have them store data in the process node's attributes? If so, then this is necessary. If we don't, then we may still want to keep this although it can be done differently. I am not sure what the answer to the question is though

# method will filter out all ports that set ``is_metadata=True`` no matter where in the namespace they are
# defined so this approach is more robust for the future.
serializable_inputs = self._filter_serializable_metadata(self.spec().inputs, self.raw_inputs)
pruned = prune_mapping(serializable_inputs)
self.node.set_metadata_inputs(pruned)

def _setup_inputs(self) -> None:
"""Create the links between the input nodes and the ProcessNode that represents this process."""
for name, node in self._flat_inputs().items():
Expand All @@ -760,6 +772,51 @@ def _setup_inputs(self) -> None:
elif isinstance(self.node, orm.WorkflowNode):
self.node.base.links.add_incoming(node, LinkType.INPUT_WORK, name)

def _filter_serializable_metadata(
self,
port: Union[None, InputPort, PortNamespace],
port_value: Any,
) -> Union[Any, None]:
"""Return the inputs that correspond to ports with ``is_metadata=True`` and that are JSON serializable.

The function is called recursively for any port namespaces.

sphuber marked this conversation as resolved.
Show resolved Hide resolved
:param port: An ``InputPort`` or ``PortNamespace``. If an ``InputPort`` that specifies ``is_metadata=True`` the
``port_value`` is returned. For a ``PortNamespace`` this method is called recursively for the keys within
the namespace and the resulting dictionary is returned, omitting ``None`` values. If either ``port`` or
``port_value`` is ``None``, ``None`` is returned.
:return: The ``port_value`` where all inputs that do no correspond to a metadata port or are not JSON
serializable, have been filtered out.
"""
if port is None or port_value is None:
return None

if isinstance(port, InputPort):
if not port.is_metadata:
return None

try:
clean_value(port_value)
except exceptions.ValidationError:
return None
else:
return port_value

result = {}

for key, value in port_value.items():
if key not in port:
continue

metadata_value = self._filter_serializable_metadata(port[key], value) # type: ignore[arg-type]

if metadata_value is None:
continue

result[key] = metadata_value

return result or None

def _flat_inputs(self) -> Dict[str, Any]:
"""
Return a flattened version of the parsed inputs dictionary.
Expand Down Expand Up @@ -802,7 +859,9 @@ def _flatten_inputs(
:return: flat list of inputs

"""
if (port is None and isinstance(port_value, orm.Node)) or (isinstance(port, InputPort) and not port.non_db):
if (port is None and
isinstance(port_value,
orm.Node)) or (isinstance(port, InputPort) and not (port.is_metadata or port.non_db)):
return [(parent_name, port_value)]

if port is None and isinstance(port_value, Mapping) or isinstance(port, PortNamespace):
Expand All @@ -822,7 +881,7 @@ def _flatten_inputs(
items.extend(sub_items)
return items

assert (port is None) or (isinstance(port, InputPort) and port.non_db)
assert (port is None) or (isinstance(port, InputPort) and (port.is_metadata or port.non_db))
return []

def _flatten_outputs(
Expand Down
26 changes: 26 additions & 0 deletions aiida/engine/processes/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""Module with utilities."""
from collections.abc import Mapping

from aiida.orm import Node


def prune_mapping(value):
"""Prune a nested mapping from all mappings that are completely empty.

.. note:: A nested mapping that is completely empty means it contains at most other empty mappings. Other null
values, such as `None` or empty lists, should not be pruned.

:param value: A nested mapping of port values.
:return: The same mapping but without any nested namespace that is completely empty.
"""
if isinstance(value, Mapping) and not isinstance(value, Node):
result = {}
for key, sub_value in value.items():
pruned = prune_mapping(sub_value)
# If `pruned` is an "empty'ish" mapping and not an instance of `Node`, skip it, otherwise keep it.
if not (isinstance(pruned, Mapping) and not pruned and not isinstance(pruned, Node)):
result[key] = pruned
return result

return value
15 changes: 0 additions & 15 deletions aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,6 @@ def _hash_ignored_attributes(cls) -> Tuple[str, ...]: # pylint: disable=no-self
'max_memory_kb',
)

def get_builder_restart(self) -> 'ProcessBuilder':
"""Return a `ProcessBuilder` that is ready to relaunch the same `CalcJob` that created this node.

The process class will be set based on the `process_type` of this node and the inputs of the builder will be
prepopulated with the inputs registered for this node. This functionality is very useful if a process has
completed and you want to relaunch it with slightly different inputs.

In addition to prepopulating the input nodes, which is implemented by the base `ProcessNode` class, here we
also add the `options` that were passed in the `metadata` input of the `CalcJob` process.

"""
builder = super().get_builder_restart()
builder.metadata.options = self.get_options() # type: ignore[attr-defined]
return builder

@property
def is_imported(self) -> bool:
"""Return whether the calculation job was imported instead of being an actual run."""
Expand Down
Loading