Skip to content

Commit

Permalink
DOC: Use upstream/downstream instead of source/destination (#316)
Browse files Browse the repository at this point in the history
* DOC: Use upstream/downstream instead of source/destination

For monai.deploy.core.Operator, instead of sometimes using "upstream"
and "downstream" and sometimes using "source" and "destination",
consistently use former.

Signed-off-by: Lee Newberg <[email protected]>

* DOC: Use source/destination instead of upstream/downstream

For monai.deploy.core.Operator, instead of sometimes using "upstream"
and "downstream" and sometimes using "source" and "destination",
consistently use the latter.

Signed-off-by: Lee Newberg <[email protected]>
  • Loading branch information
Leengit authored Aug 1, 2022
1 parent 0a8af47 commit 2526579
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 44 deletions.
4 changes: 2 additions & 2 deletions docs/source/developing_with_sdk/creating_application_class.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ The resource requirements (such as `cpu`, `memory`, and `gpu`) for the applicati

In `compose()` method, operators are instantiated and connected through <a href="../modules/_autosummary/monai.deploy.core.Application.html#monai.deploy.core.Application.add_flow">self.add_flow()</a>.

> add_flow(upstream_op, downstream_op, io_map=None)
> add_flow(source_op, destination_op, io_map=None)
`io_map` is a dictionary of mapping from the source operator's label to the destination operator's label(s) and its type is `Dict[str, str|Set[str]]`.

We can skip specifying `io_map` if both the number of `upstream_op`'s outputs and the number of `downstream_op`'s inputs are one.
We can skip specifying `io_map` if both the number of `source_op`'s outputs and the number of `destination_op`'s inputs are one.
For example, if Operators named `task1` and `task2` has only one input and output (with the label `image`), `self.add_flow(task1, task2)` is same with `self.add_flow(task1, task2, {"image": "image"})` or `self.add_flow(task1, task2, {"image": {"image"}})`.

```python
Expand Down
4 changes: 2 additions & 2 deletions docs/srs.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ MONAI Deploy App SDK 0.1.0

## [REQ] Representing Workflow With DAG

The SDK shall enable dependencies among upstream and downstream operators in an application using a DAG so that app workflow can be modeled unambiguously. The SDK shall provide a mechanism to link an output port of an upstream operator to an input port of a downstream operator to form the DAG.
The SDK shall enable dependencies among source and destination operators in an application using a DAG so that app workflow can be modeled unambiguously. The SDK shall provide a mechanism to link an output port of a source operator to an input port of a destination operator to form the DAG.

### Background

Expand Down Expand Up @@ -397,7 +397,7 @@ MONAI Deploy App SDK 0.1.0

## [REQ] Loading a DICOM 2d/3d dataset into a unified domain object

The SDK shall enable applications to load a 2D/3D imaging dataset belonging to a single DICOM series into a unified "Image" domain object so that downstream operators can process this domain object based on the application's needs such as transformation and inference.
The SDK shall enable applications to load a 2D/3D imaging dataset belonging to a single DICOM series into a unified "Image" domain object so that destination operators can process this domain object based on the application's needs such as transformation and inference.

### Background

Expand Down
4 changes: 2 additions & 2 deletions examples/apps/ai_livertumor_seg_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def compose(self):
"Tumor",
]
)
# Create the processing pipeline, by specifying the upstream and downstream operators, and
# Create the processing pipeline, by specifying the source and destination operators, and
# ensuring the output from the former matches the input of the latter, in both name and type.
self.add_flow(study_loader_op, series_selector_op, {"dicom_study_list": "dicom_study_list"})
self.add_flow(
Expand All @@ -68,7 +68,7 @@ def compose(self):
# Add the publishing operator to save the input and seg images for Render Server.
# Note the PublisherOperator has temp impl till a proper rendering module is created.
self.add_flow(unetr_seg_op, publisher_op, {"saved_images_folder": "saved_images_folder"})
# Note below the dicom_seg_writer requires two inputs, each coming from a upstream operator.
# Note below the dicom_seg_writer requires two inputs, each coming from a source operator.
self.add_flow(
series_selector_op, dicom_seg_writer, {"study_selected_series_list": "study_selected_series_list"}
)
Expand Down
4 changes: 2 additions & 2 deletions examples/apps/ai_spleen_seg_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ def compose(self):
# Create DICOM Seg writer with segment label name in a string list
dicom_seg_writer = DICOMSegmentationWriterOperator(seg_labels=["Spleen"])

# Create the processing pipeline, by specifying the upstream and downstream operators, and
# Create the processing pipeline, by specifying the source and destination operators, and
# ensuring the output from the former matches the input of the latter, in both name and type.
self.add_flow(study_loader_op, series_selector_op, {"dicom_study_list": "dicom_study_list"})
self.add_flow(
series_selector_op, series_to_vol_op, {"study_selected_series_list": "study_selected_series_list"}
)
self.add_flow(series_to_vol_op, bundle_spleen_seg_op, {"image": "image"})
# Note below the dicom_seg_writer requires two inputs, each coming from a upstream operator.
# Note below the dicom_seg_writer requires two inputs, each coming from a source operator.
self.add_flow(
series_selector_op, dicom_seg_writer, {"study_selected_series_list": "study_selected_series_list"}
)
Expand Down
2 changes: 1 addition & 1 deletion examples/apps/ai_unetr_seg_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def compose(self):
output_file="stl/multi-organs.stl", keep_largest_connected_component=False
)

# Create the processing pipeline, by specifying the upstream and downstream operators, and
# Create the processing pipeline, by specifying the source and destination operators, and
# ensuring the output from the former matches the input of the latter, in both name and type.
self.add_flow(study_loader_op, series_selector_op, {"dicom_study_list": "dicom_study_list"})
self.add_flow(
Expand Down
42 changes: 21 additions & 21 deletions monai/deploy/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,35 +182,35 @@ def add_operator(self, operator: Operator):
self._graph.add_operator(operator)

def add_flow(
self, upstream_op: Operator, downstream_op: Operator, io_map: Optional[Dict[str, Union[str, Set[str]]]] = None
self, source_op: Operator, destination_op: Operator, io_map: Optional[Dict[str, Union[str, Set[str]]]] = None
):
"""Adds a flow from upstream to downstream.
"""Adds a flow from source to destination.
An output port of the upstream operator is connected to one of the
input ports of a downstream operators.
An output port of the source operator is connected to one of the
input ports of a destination operators.
Args:
upstream_op (Operator): An instance of the upstream operator of type Operator.
downstream_op (Operator): An instance of the downstream operator of type Operator.
source_op (Operator): An instance of the source operator of type Operator.
destination_op (Operator): An instance of the destination operator of type Operator.
io_map (Optional[Dict[str, Union[str, Set[str]]]]): A dictionary of mapping from the source operator's label
to the destination operator's label(s).
"""

# Ensure that the upstream and downstream operators are valid
upstream_op.ensure_valid()
downstream_op.ensure_valid()
# Ensure that the source and destination operators are valid
source_op.ensure_valid()
destination_op.ensure_valid()

op_output_labels = upstream_op.op_info.get_labels(IO.OUTPUT)
op_input_labels = downstream_op.op_info.get_labels(IO.INPUT)
op_output_labels = source_op.op_info.get_labels(IO.OUTPUT)
op_input_labels = destination_op.op_info.get_labels(IO.INPUT)
if not io_map:
if len(op_output_labels) > 1:
raise IOMappingError(
f"The upstream operator has more than one output port "
f"The source operator has more than one output port "
f"({', '.join(op_output_labels)}) so mapping should be specified explicitly!"
)
if len(op_input_labels) > 1:
raise IOMappingError(
f"The downstream operator has more than one output port ({', '.join(op_input_labels)}) "
f"The destination operator has more than one output port ({', '.join(op_input_labels)}) "
f"so mapping should be specified explicitly!"
)
io_map = {"": {""}}
Expand All @@ -221,14 +221,14 @@ def add_flow(
if isinstance(v, str):
io_maps[k] = {v}

# Verify that the upstream & downstream operator have the input and output ports specified by the io_map
# Verify that the source & destination operator have the input and output ports specified by the io_map
output_labels = list(io_maps.keys())

if len(op_output_labels) == 1 and len(output_labels) != 1:
raise IOMappingError(
f"The upstream operator({upstream_op.name}) has only one port with label "
f"The source operator({source_op.name}) has only one port with label "
f"'{next(iter(op_output_labels))}' but io_map specifies {len(output_labels)} "
f"labels({', '.join(output_labels)}) to the upstream operator's output port"
f"labels({', '.join(output_labels)}) to the source operator's output port"
)

for output_label in output_labels:
Expand All @@ -239,7 +239,7 @@ def add_flow(
del io_maps[output_label]
break
raise IOMappingError(
f"The upstream operator({upstream_op.name}) has no output port with label '{output_label}'. "
f"The source operator({source_op.name}) has no output port with label '{output_label}'. "
f"It should be one of ({', '.join(op_output_labels)})."
)

Expand All @@ -249,9 +249,9 @@ def add_flow(

if len(op_input_labels) == 1 and len(input_labels) != 1:
raise IOMappingError(
f"The downstream operator({downstream_op.name}) has only one port with label "
f"The destination operator({destination_op.name}) has only one port with label "
f"'{next(iter(op_input_labels))}' but io_map specifies {len(input_labels)} "
f"labels({', '.join(input_labels)}) to the downstream operator's input port"
f"labels({', '.join(input_labels)}) to the destination operator's input port"
)

for input_label in input_labels:
Expand All @@ -262,11 +262,11 @@ def add_flow(
input_labels.add(next(iter(op_input_labels)))
break
raise IOMappingError(
f"The downstream operator({downstream_op.name}) has no input port with label '{input_label}'. "
f"The destination operator({destination_op.name}) has no input port with label '{input_label}'. "
f"It should be one of ({', '.join(op_input_labels)})."
)

self._graph.add_flow(upstream_op, downstream_op, io_maps)
self._graph.add_flow(source_op, destination_op, io_maps)

def get_package_info(self, model_path: Union[str, Path] = "") -> Dict:
"""Returns the package information of this application.
Expand Down
6 changes: 3 additions & 3 deletions monai/deploy/core/executors/multi_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
# # Figure out how to deal with duplicate nodes
# q.put(e[1])
# edge_data = g.get_edge_data(e[0], e[1])
# output = node.get_output(edge_data["upstream_op_port"])
# key1 = (e[0].get_uid(), "output", edge_data["upstream_op_port"])
# output = node.get_output(edge_data["source_op_port"])
# key1 = (e[0].get_uid(), "output", edge_data["source_op_port"])
# self._storage.store(key1, output)
# key2 = (e[1].get_uid(), "input", edge_data["downstream_op_port"])
# key2 = (e[1].get_uid(), "input", edge_data["destination_op_port"])
# self._storage.store(key2, output)

# def _launch_operator(self, op):
Expand Down
6 changes: 3 additions & 3 deletions monai/deploy/core/executors/multi_threaded_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
# # Figure out how to deal with duplicate nodes
# q.put(e[1])
# edge_data = g.get_edge_data(e[0], e[1])
# output = node.get_output(edge_data["upstream_op_port"])
# key1 = (e[0].get_uid(), "output", edge_data["upstream_op_port"])
# output = node.get_output(edge_data["source_op_port"])
# key1 = (e[0].get_uid(), "output", edge_data["source_op_port"])
# self._storage.store(key1, output)
# key2 = (e[1].get_uid(), "input", edge_data["downstream_op_port"])
# key2 = (e[1].get_uid(), "input", edge_data["destination_op_port"])
# self._storage.store(key2, output)
4 changes: 2 additions & 2 deletions monai/deploy/operators/monai_bundle_inference_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class MonaiBundleInferenceOperator(InferenceOperator):
For image input and output, the type is the `Image` class. For output of probabilities, the type is `Dict`.
This operator is expected to be linked with both upstream and downstream operators, e.g. receiving an `Image` object from
This operator is expected to be linked with both source and destination operators, e.g. receiving an `Image` object from
the `DICOMSeriesToVolumeOperator`, and passing a segmentation `Image` to the `DICOMSegmentationWriterOperator`.
In such cases, the I/O storage type can only be `IN_MEMORY` due to the restrictions imposed by the application executor.
However, when used as the first operator in an application, its input storage type needs to be `DISK`, and the file needs
Expand Down Expand Up @@ -618,7 +618,7 @@ def _send_output(self, value: Any, name: str, metadata: Dict, op_output: OutputC
# out of the MONAI post processing is [CWHD] with dim for batch already squeezed out.
# Prediction image, e.g. segmentation image, needs to have its dimensions
# rearranged to fit the conventions used by Image class, i.e. [DHW], without channel dim.
# Also, based on known use cases, e.g. prediction being seg image and the downstream
# Also, based on known use cases, e.g. prediction being seg image and the destination
# operators expect the data type to be unit8, conversion needs to be done as well.
# Metadata, such as pixel spacing and orientation, also needs to be set in the Image object,
# which is why metadata is expected to be passed in.
Expand Down
4 changes: 2 additions & 2 deletions notebooks/tutorials/01_simple_app.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,11 @@
"In `compose()` method, objects of `SobelOperator`, `MedianOperator`, and `GaussianOperator` classes are created\n",
"and connected through <a href=\"../../modules/_autosummary/monai.deploy.core.Application.html#monai.deploy.core.Application.add_flow\">self.add_flow()</a>.\n",
"\n",
"> add_flow(upstream_op, downstream_op, io_map=None)\n",
"> add_flow(source_op, destination_op, io_map=None)\n",
"\n",
"`io_map` is a dictionary of mapping from the source operator's label to the destination operator's label(s) and its type is `Dict[str, str|Set[str]]`. \n",
"\n",
"We can skip specifying `io_map` if both the number of `upstream_op`'s outputs and the number of `downstream_op`'s inputs are one so `self.add_flow(sobel_op, median_op)` is same with `self.add_flow(sobel_op, median_op, {\"image\": \"image\"})` or `self.add_flow(sobel_op, median_op, {\"image\": {\"image\"}})`.\n"
"We can skip specifying `io_map` if both the number of `source_op`'s outputs and the number of `destination_op`'s inputs are one so `self.add_flow(sobel_op, median_op)` is same with `self.add_flow(sobel_op, median_op, {\"image\": \"image\"})` or `self.add_flow(sobel_op, median_op, {\"image\": {\"image\"}})`.\n"
]
},
{
Expand Down
8 changes: 4 additions & 4 deletions notebooks/tutorials/06_monai_bundle_app.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,14 @@
" # Create DICOM Seg writer with segment label name in a string list\n",
" dicom_seg_writer = DICOMSegmentationWriterOperator(seg_labels=[\"Spleen\"])\n",
"\n",
" # Create the processing pipeline, by specifying the upstream and downstream operators, and\n",
" # Create the processing pipeline, by specifying the source and destination operators, and\n",
" # ensuring the output from the former matches the input of the latter, in both name and type.\n",
" self.add_flow(study_loader_op, series_selector_op, {\"dicom_study_list\": \"dicom_study_list\"})\n",
" self.add_flow(\n",
" series_selector_op, series_to_vol_op, {\"study_selected_series_list\": \"study_selected_series_list\"}\n",
" )\n",
" self.add_flow(series_to_vol_op, bundle_spleen_seg_op, {\"image\": \"image\"})\n",
" # Note below the dicom_seg_writer requires two inputs, each coming from a upstream operator.\n",
" # Note below the dicom_seg_writer requires two inputs, each coming from a source operator.\n",
" self.add_flow(\n",
" series_selector_op, dicom_seg_writer, {\"study_selected_series_list\": \"study_selected_series_list\"}\n",
" )\n",
Expand Down Expand Up @@ -477,14 +477,14 @@
" # Create DICOM Seg writer with segment label name in a string list\n",
" dicom_seg_writer = DICOMSegmentationWriterOperator(seg_labels=[\"Spleen\"])\n",
"\n",
" # Create the processing pipeline, by specifying the upstream and downstream operators, and\n",
" # Create the processing pipeline, by specifying the source and destination operators, and\n",
" # ensuring the output from the former matches the input of the latter, in both name and type.\n",
" self.add_flow(study_loader_op, series_selector_op, {\"dicom_study_list\": \"dicom_study_list\"})\n",
" self.add_flow(\n",
" series_selector_op, series_to_vol_op, {\"study_selected_series_list\": \"study_selected_series_list\"}\n",
" )\n",
" self.add_flow(series_to_vol_op, bundle_spleen_seg_op, {\"image\": \"image\"})\n",
" # Note below the dicom_seg_writer requires two inputs, each coming from a upstream operator.\n",
" # Note below the dicom_seg_writer requires two inputs, each coming from a source operator.\n",
" self.add_flow(\n",
" series_selector_op, dicom_seg_writer, {\"study_selected_series_list\": \"study_selected_series_list\"}\n",
" )\n",
Expand Down

0 comments on commit 2526579

Please sign in to comment.