diff --git a/dali/python/nvidia/dali/_debug_mode.py b/dali/python/nvidia/dali/_debug_mode.py index 482784333c..881e5f68fe 100644 --- a/dali/python/nvidia/dali/_debug_mode.py +++ b/dali/python/nvidia/dali/_debug_mode.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -790,8 +790,7 @@ def run(self): """Run the pipeline and return the result.""" import numpy as np - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() self._debug_on = True self._cur_operator_id = -1 @@ -834,8 +833,7 @@ def feed_input(self, data_node, data, **kwargs): """Pass data to an ExternalSource operator inside the pipeline. Refer to :meth:`Pipeline.feed_input() ` for details.""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if isinstance(data_node, str): name = data_node else: diff --git a/dali/python/nvidia/dali/pipeline.py b/dali/python/nvidia/dali/pipeline.py index d7dfbbc6b8..cb413317a4 100644 --- a/dali/python/nvidia/dali/pipeline.py +++ b/dali/python/nvidia/dali/pipeline.py @@ -500,8 +500,7 @@ def epoch_size(self, name=None): The reader which should be used to obtain epoch size. """ - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if name is not None: return self._pipe.reader_meta(name)["epoch_size_padded"] meta = self._pipe.reader_meta() @@ -529,8 +528,7 @@ def executor_statistics(self): .. note:: Executor statistics are not available when using ``exec_dynamic=True``. """ - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() return self._pipe.executor_statistics() def external_source_shm_statistics(self): @@ -590,8 +588,7 @@ def reader_meta(self, name=None): name : str, optional, default = None The reader which should be used to obtain shards_number. """ - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if name is not None: return self._pipe.reader_meta(name) return self._pipe.reader_meta() @@ -1042,10 +1039,19 @@ def _next_op_id(self): return i def build(self): - """Build the pipeline. + """Build the pipeline (optional step). - Pipeline needs to be built in order to run it standalone. - Framework-specific plugins handle this step automatically. + Pipeline is automatically built when Pipeline is: + + * run, either via the run APIs (:meth:`run`, :meth:`schedule_run`), + or the framework-specific plugins, + * the inputs are provided via :meth:`feed_input` + * the pipeline metadata is accessed (:meth:`epoch_size`, :meth:`reader_meta`) + * outputs are accessed - including :meth:`output_stream` + * the graph needs to be otherwise materialized - like :meth:`save_graph_to_dot_file`. + + If needed, the :meth:`build` can be invoked ahead, allowing to separate the graph building + and the processing steps. """ if self._built: return @@ -1064,6 +1070,12 @@ def build(self): self._restore_state_from_checkpoint() self._built = True + def _ensure_built(self): + """Ensure that the Pipeline is built before proceeding further. Allows to make the + pipeline.build() optional.""" + if not self._built: + self.build() + def input_feed_count(self, input_name): return self._pipe.InputFeedCount(input_name) @@ -1142,8 +1154,7 @@ def feed_input(self, data_node, data, layout=None, cuda_stream=None, use_copy_ke If set to True, DALI will use a CUDA kernel to feed the data (only applicable when copying data to/from GPU memory) instead of ``cudaMemcpyAsync`` (default). """ - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if isinstance(data_node, str): name = data_node else: @@ -1226,8 +1237,7 @@ def schedule_run(self): def output_stream(self): """Returns the internal CUDA stream on which the outputs are produced.""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() return self._pipe.GetOutputStream() # for the backward compatibility @@ -1296,8 +1306,7 @@ def release_outputs(self): When using dynamic executor (``exec_dynamic=True``), the buffers are not invalidated. """ with self._check_api_type_scope(types.PipelineAPIType.SCHEDULED): - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() ret = self._pipe.ReleaseOutputs() return ret @@ -1312,8 +1321,7 @@ def _outputs(self, cuda_stream=None): Calling this function is equivalent to calling release_outputs then calling share_outputs""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() return self._pipe.Outputs(types._raw_cuda_stream_ptr(cuda_stream)) def _are_pipeline_inputs_possible(self): @@ -1361,7 +1369,6 @@ def my_pipe(): :meth:`run()` function:: p = my_pipe(prefetch_queue_depth=1, ...) - p.build() p.run(my_inp=np.random((2,3,2))) Such keyword argument specified in the :meth:`run()` function has to have a @@ -1400,8 +1407,7 @@ def my_pipe(): def _prefetch(self): """Executes pipeline to fill executor's pipeline.""" - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if not self._pipe: raise RuntimeError("The pipeline was destroyed.") self._schedule_py_workers() @@ -1665,8 +1671,7 @@ def save_graph_to_dot_file( use_colors : bool Whether use color to distinguish stages """ - if not self._built: - raise RuntimeError("Pipeline must be built first.") + self._ensure_built() if show_ids is not None: with warnings.catch_warnings(): warnings.simplefilter("default") @@ -2042,7 +2047,7 @@ def my_pipe(flip_vertical, flip_horizontal): The decorated function returns a DALI Pipeline object:: pipe = my_pipe(True, False) - # pipe.build() # the pipeline is not configured properly yet + # pipe.run() # the pipeline is not configured properly yet A pipeline requires additional parameters such as batch size, number of worker threads, GPU device id and so on (see :meth:`nvidia.dali.Pipeline()` for a @@ -2051,9 +2056,9 @@ def my_pipe(flip_vertical, flip_horizontal): passed to the decorated function:: pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0) - pipe.build() # the pipeline is properly configured, we can build it now - The outputs from the original function became the outputs of the Pipeline:: + The pipeline is properly configured, we can run it now. The outputs from the original function + became the outputs of the Pipeline:: flipped, img = pipe.run() diff --git a/dali/test/python/test_pipeline.py b/dali/test/python/test_pipeline.py index a64f481c15..87e6770b0f 100644 --- a/dali/test/python/test_pipeline.py +++ b/dali/test/python/test_pipeline.py @@ -2333,3 +2333,32 @@ def def_ref(): (ref,) = ref_pipe.run() check_batch(cpu, ref, bs, 0, 0, "HWC") check_batch(gpu, ref, bs, 0, 0, "HWC") + + +def test_optional_build(): + bs = 8 + + @pipeline_def(batch_size=bs, num_threads=4, device_id=0) + def pdef_regular(): + enc, _ = fn.readers.file(file_root=jpeg_folder, name="only_reader") + img = fn.decoders.image(enc, device="mixed") + return img + + @pipeline_def(batch_size=bs, num_threads=4, device_id=0) + def pdef_source(): + source = fn.external_source(name="es") + return source + + pipes = [pdef_regular() for _ in range(5)] + + pipes[0].run() + pipes[1].schedule_run() + assert pipes[2].epoch_size("only_reader") != 0 + assert pipes[3].executor_statistics() == {} + assert "shard_id" in pipes[4].reader_meta("only_reader") + + pipes.append(pdef_source()) + pipes[-1].feed_input("source", np.array([10, 10])) + + for pipe in pipes: + assert pipe._built diff --git a/docs/examples/getting_started.ipynb b/docs/examples/getting_started.ipynb index 3f57298ebe..81d05fe927 100644 --- a/docs/examples/getting_started.ipynb +++ b/docs/examples/getting_started.ipynb @@ -86,7 +86,7 @@ "source": [ "### Building the Pipeline\n", "\n", - "In order to use the pipeline defined with `simple_pipeline`, we need to create and build it. This is achieved by calling `simple_pipeline`, which creates an instance of the pipeline. Then we call `build` on this newly created instance:" + "In order to use the pipeline defined with `simple_pipeline`, we need to instantiate it. This is achieved by calling `simple_pipeline`, which creates an instance of the pipeline." ] }, { @@ -95,8 +95,7 @@ "metadata": {}, "outputs": [], "source": [ - "pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)\n", - "pipe.build()" + "pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)" ] }, { @@ -119,7 +118,7 @@ "source": [ "### Running the Pipeline\n", "\n", - "After the pipeline is built, we can run it to get a batch of results." + "After the pipeline instance is created, we can run it to get a batch of results." ] }, { @@ -438,8 +437,7 @@ "source": [ "pipe = shuffled_pipeline(\n", " batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234\n", - ")\n", - "pipe.build()" + ")" ] }, { @@ -604,8 +602,7 @@ "source": [ "pipe = random_rotated_pipeline(\n", " batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234\n", - ")\n", - "pipe.build()" + ")" ] }, {