Skip to content

Commit

Permalink
Make build() optional
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Lecki <[email protected]>
  • Loading branch information
klecki committed Dec 17, 2024
1 parent 8535a3b commit 0ab3715
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 38 deletions.
8 changes: 3 additions & 5 deletions dali/python/nvidia/dali/_debug_mode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -790,8 +790,7 @@ def run(self):
"""Run the pipeline and return the result."""
import numpy as np

if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()

self._debug_on = True
self._cur_operator_id = -1
Expand Down Expand Up @@ -834,8 +833,7 @@ def feed_input(self, data_node, data, **kwargs):
"""Pass data to an ExternalSource operator inside the pipeline.
Refer to :meth:`Pipeline.feed_input() <nvidia.dali.Pipeline.feed_input>` for details."""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if isinstance(data_node, str):
name = data_node
else:
Expand Down
55 changes: 30 additions & 25 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,7 @@ def epoch_size(self, name=None):
The reader which should be used to obtain epoch size.
"""

if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if name is not None:
return self._pipe.reader_meta(name)["epoch_size_padded"]
meta = self._pipe.reader_meta()
Expand Down Expand Up @@ -529,8 +528,7 @@ def executor_statistics(self):
.. note::
Executor statistics are not available when using ``exec_dynamic=True``.
"""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
return self._pipe.executor_statistics()

def external_source_shm_statistics(self):
Expand Down Expand Up @@ -590,8 +588,7 @@ def reader_meta(self, name=None):
name : str, optional, default = None
The reader which should be used to obtain shards_number.
"""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if name is not None:
return self._pipe.reader_meta(name)
return self._pipe.reader_meta()
Expand Down Expand Up @@ -1042,10 +1039,19 @@ def _next_op_id(self):
return i

def build(self):
"""Build the pipeline.
"""Build the pipeline (optional step).
Pipeline needs to be built in order to run it standalone.
Framework-specific plugins handle this step automatically.
Pipeline is automatically built when Pipeline is:
* run, either via the run APIs (:meth:`run`, :meth:`schedule_run`),
or the framework-specific plugins,
* the inputs are provided via :meth:`feed_input`
* the pipeline metadata is accessed (:meth:`epoch_size`, :meth:`reader_meta`)
* outputs are accessed - including :meth:`output_stream`
* the graph needs to be otherwise materialized - like :meth:`save_graph_to_dot_file`.
If needed, the :meth:`build` can be invoked ahead, allowing to separate the graph building
and the processing steps.
"""
if self._built:
return
Expand All @@ -1064,6 +1070,12 @@ def build(self):
self._restore_state_from_checkpoint()
self._built = True

def _ensure_built(self):
"""Ensure that the Pipeline is built before proceeding further. Allows to make the
pipeline.build() optional."""
if not self._built:
self.build()

def input_feed_count(self, input_name):
return self._pipe.InputFeedCount(input_name)

Expand Down Expand Up @@ -1142,8 +1154,7 @@ def feed_input(self, data_node, data, layout=None, cuda_stream=None, use_copy_ke
If set to True, DALI will use a CUDA kernel to feed the data (only applicable
when copying data to/from GPU memory) instead of ``cudaMemcpyAsync`` (default).
"""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if isinstance(data_node, str):
name = data_node
else:
Expand Down Expand Up @@ -1226,8 +1237,7 @@ def schedule_run(self):

def output_stream(self):
"""Returns the internal CUDA stream on which the outputs are produced."""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
return self._pipe.GetOutputStream()

# for the backward compatibility
Expand Down Expand Up @@ -1296,8 +1306,7 @@ def release_outputs(self):
When using dynamic executor (``exec_dynamic=True``), the buffers are not invalidated.
"""
with self._check_api_type_scope(types.PipelineAPIType.SCHEDULED):
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
ret = self._pipe.ReleaseOutputs()
return ret

Expand All @@ -1312,8 +1321,7 @@ def _outputs(self, cuda_stream=None):
Calling this function is equivalent to calling release_outputs
then calling share_outputs"""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
return self._pipe.Outputs(types._raw_cuda_stream_ptr(cuda_stream))

def _are_pipeline_inputs_possible(self):
Expand Down Expand Up @@ -1361,7 +1369,6 @@ def my_pipe():
:meth:`run()` function::
p = my_pipe(prefetch_queue_depth=1, ...)
p.build()
p.run(my_inp=np.random((2,3,2)))
Such keyword argument specified in the :meth:`run()` function has to have a
Expand Down Expand Up @@ -1400,8 +1407,7 @@ def my_pipe():

def _prefetch(self):
"""Executes pipeline to fill executor's pipeline."""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if not self._pipe:
raise RuntimeError("The pipeline was destroyed.")
self._schedule_py_workers()
Expand Down Expand Up @@ -1665,8 +1671,7 @@ def save_graph_to_dot_file(
use_colors : bool
Whether use color to distinguish stages
"""
if not self._built:
raise RuntimeError("Pipeline must be built first.")
self._ensure_built()
if show_ids is not None:
with warnings.catch_warnings():
warnings.simplefilter("default")
Expand Down Expand Up @@ -2042,7 +2047,7 @@ def my_pipe(flip_vertical, flip_horizontal):
The decorated function returns a DALI Pipeline object::
pipe = my_pipe(True, False)
# pipe.build() # the pipeline is not configured properly yet
# pipe.run() # the pipeline is not configured properly yet
A pipeline requires additional parameters such as batch size, number of worker threads,
GPU device id and so on (see :meth:`nvidia.dali.Pipeline()` for a
Expand All @@ -2051,9 +2056,9 @@ def my_pipe(flip_vertical, flip_horizontal):
passed to the decorated function::
pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)
pipe.build() # the pipeline is properly configured, we can build it now
The outputs from the original function became the outputs of the Pipeline::
The pipeline is properly configured, we can run it now. The outputs from the original function
became the outputs of the Pipeline::
flipped, img = pipe.run()
Expand Down
29 changes: 29 additions & 0 deletions dali/test/python/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,32 @@ def def_ref():
(ref,) = ref_pipe.run()
check_batch(cpu, ref, bs, 0, 0, "HWC")
check_batch(gpu, ref, bs, 0, 0, "HWC")


def test_optional_build():
bs = 8

@pipeline_def(batch_size=bs, num_threads=4, device_id=0)
def pdef_regular():
enc, _ = fn.readers.file(file_root=jpeg_folder, name="only_reader")
img = fn.decoders.image(enc, device="mixed")
return img

@pipeline_def(batch_size=bs, num_threads=4, device_id=0)
def pdef_source():
source = fn.external_source(name="es")
return source

pipes = [pdef_regular() for _ in range(5)]

pipes[0].run()
pipes[1].schedule_run()
assert pipes[2].epoch_size("only_reader") != 0
assert pipes[3].executor_statistics() == {}
assert "shard_id" in pipes[4].reader_meta("only_reader")

pipes.append(pdef_source())
pipes[-1].feed_input("source", np.array([10, 10]))

for pipe in pipes:
assert pipe._built
13 changes: 5 additions & 8 deletions docs/examples/getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"source": [
"### Building the Pipeline\n",
"\n",
"In order to use the pipeline defined with `simple_pipeline`, we need to create and build it. This is achieved by calling `simple_pipeline`, which creates an instance of the pipeline. Then we call `build` on this newly created instance:"
"In order to use the pipeline defined with `simple_pipeline`, we need to instantiate it. This is achieved by calling `simple_pipeline`, which creates an instance of the pipeline."
]
},
{
Expand All @@ -95,8 +95,7 @@
"metadata": {},
"outputs": [],
"source": [
"pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)\n",
"pipe.build()"
"pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)"
]
},
{
Expand All @@ -119,7 +118,7 @@
"source": [
"### Running the Pipeline\n",
"\n",
"After the pipeline is built, we can run it to get a batch of results."
"After the pipeline instance is created, we can run it to get a batch of results."
]
},
{
Expand Down Expand Up @@ -438,8 +437,7 @@
"source": [
"pipe = shuffled_pipeline(\n",
" batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234\n",
")\n",
"pipe.build()"
")"
]
},
{
Expand Down Expand Up @@ -604,8 +602,7 @@
"source": [
"pipe = random_rotated_pipeline(\n",
" batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234\n",
")\n",
"pipe.build()"
")"
]
},
{
Expand Down

0 comments on commit 0ab3715

Please sign in to comment.