From 7d482742fdde4ee0af4f5096db23fcfea5b76acb Mon Sep 17 00:00:00 2001 From: Alisson Gusatti Azzolini Date: Tue, 20 Jun 2017 22:20:45 -0700 Subject: [PATCH] Allow tasks/execution_steps to be cloned at runtime Summary: Advantages of cloning the tasks/execution_steps at runtime: - Less complexity on the python side: no need to clone nets and add prefixes to blob names - Faster start-up: we had cases of complex plans that took up to 30min to be created. - Better isolation: each task cloned at runtime has its own child workspace, preventing false sharing of blobs. - Opens up possibility for dynamic scheduling: Number of threads per task can be increased on the fly, at runtime. Reviewed By: dzhulgakov Differential Revision: D5100730 fbshipit-source-id: 71b83193b135da4e6eaf2536d8fc266528e1fdcc --- caffe2/core/plan_executor.cc | 300 +++++++++++++++++++++--------- caffe2/proto/caffe2.proto | 7 + caffe2/python/checkpoint_test.py | 5 +- caffe2/python/core.py | 16 +- caffe2/python/dataio_test.py | 85 +++++++-- caffe2/python/net_builder.py | 44 ++++- caffe2/python/net_builder_test.py | 94 +++++++++- caffe2/python/net_printer.py | 52 ++++-- caffe2/python/net_printer_test.py | 19 +- caffe2/python/pipeline.py | 129 ++++++++++--- caffe2/python/pybind_state.cc | 8 +- caffe2/python/session.py | 5 +- caffe2/python/task.py | 135 ++++++++++---- 13 files changed, 704 insertions(+), 195 deletions(-) diff --git a/caffe2/core/plan_executor.cc b/caffe2/core/plan_executor.cc index d4f2626ddc035..d9de6932e60c6 100644 --- a/caffe2/core/plan_executor.cc +++ b/caffe2/core/plan_executor.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "caffe2/core/timer.h" @@ -20,6 +21,16 @@ namespace caffe2 { namespace { +struct NetDefInfo { + const NetDef* netDef; + // in order to keep the "override existing nets" on the top-level workflow, + // we need to makr the nets that already exist so that we can override them + // exactly once. + bool needsOverride; +}; + +using NetDefMap = std::unordered_map; + struct Reporter { struct ReporterInstance { std::mutex report_mutex; @@ -98,24 +109,129 @@ inline const bool getShouldStop(const Blob* b) { return *(t.template data()); } +struct CompiledExecutionStep; + +/** + * Controls compilation and runtime cloning of execution steps. + * + * If step.create_workspace=False, this wrapper will compile the execution step + * and its children once, and calls to ExecutionStepWrapper::compiled() will + * always return the same compiled step. + * If step.create_workspace=True, no compilation is done at creation time. + * Instead, a new CompiledExecutionStep is created for every compiled() call. + * + * CompiledExecutionStep owns its Workspace, and the lifetime of the + * compiled step along with its workspace will be tied to the lifetime of + * the `CompileGuard` object returned by compiled(). + * + * ExecuteStepRecursive will call call compiled() once before the given + * execution step is run and keep it alive for the length of its execution. + * This means that, for steps with create_workspace=true, a child workspace + * will be created everytime the step is executed, and destroyed right + * afterwards. + */ +struct ExecutionStepWrapper { + ExecutionStepWrapper( + const ExecutionStep* step, + Workspace* externalWorkspace, + ShouldContinue externalShouldContinue, + NetDefMap* netDefs) + : step_(step), + externalWorkspace_(externalWorkspace), + externalShouldContinue_(externalShouldContinue), + netDefs_(netDefs) { + // If this execution step does not create a child workspace, + // then just eagerly-compile it. This will trigger CreateNet on the + // nets used by this execution step. + if (!step_->create_workspace()) { + compiledStep_ = doCompile(); + } + } + + class CompiledGuard { + void reset(std::unique_ptr&& compiled) { + compiled_ = std::move(compiled); + compiledRef_ = compiled_.get(); + } + void reset(CompiledExecutionStep* compiledRef) { + compiled_.reset(); + compiledRef_ = compiledRef; + } + + public: + CompiledExecutionStep* operator->() { + return compiledRef_; + } + + private: + CompiledGuard() {} + std::unique_ptr compiled_; + CompiledExecutionStep* compiledRef_; + friend class ExecutionStepWrapper; + }; + + const ExecutionStep& step() { + return *step_; + } + + CompiledGuard compiled() { + CompiledGuard guard; + if (compiledStep_) { + guard.reset(compiledStep_.get()); + } else { + guard.reset(doCompile()); + } + return guard; + } + + private: + std::unique_ptr doCompile(); + + const ExecutionStep* step_; + Workspace* externalWorkspace_; + ShouldContinue externalShouldContinue_; + NetDefMap* netDefs_; + std::unique_ptr compiledStep_; +}; + struct CompiledExecutionStep { typedef std::function ShouldContinue; CompiledExecutionStep( const ExecutionStep* mainStep, - Workspace* ws, - ShouldContinue externalShouldContinue) - : workspace(ws), step(mainStep) { + Workspace* externalWorkspace, + ShouldContinue externalShouldContinue, + NetDefMap* netDefs) + : step(mainStep) { + if (mainStep->create_workspace()) { + localWorkspace_.reset(new Workspace(externalWorkspace)); + workspace = localWorkspace_.get(); + } else { + workspace = externalWorkspace; + } + CAFFE_ENFORCE( (step->substep_size() == 0 || step->network_size() == 0), "An ExecutionStep should either have substep or networks" "but not both."); - if (step->has_should_stop_blob()) { - shouldStop = ws->GetBlob(step->should_stop_blob()); + auto createAndGetNet = [&](const std::string& network_name) { + auto it = netDefs->find(network_name); CAFFE_ENFORCE( - shouldStop, "blob ", step->should_stop_blob(), " does not exist"); - } + it != netDefs->end(), + "ExecutionStep " + mainStep->name() + " uses undefined net " + + network_name); + // needsOverride does not need synchronization because it is only + // relevant for non-dynamic executions steps. This is due to the fact + // that concurrent nets run on child workspaces, that do not needOverride. + if (it->second.needsOverride || !workspace->GetNet(network_name)) { + workspace->CreateNet(*it->second.netDef, true); + it->second.needsOverride = false; + } + auto* net = workspace->GetNet(network_name); + CAFFE_ENFORCE(net != nullptr, "Network ", network_name, " not found."); + return net; + }; if (step->substep_size()) { ShouldContinue substepShouldContinue; @@ -128,8 +244,8 @@ struct CompiledExecutionStep { } for (const auto& ss : step->substep()) { - auto compiledSubstep = std::make_shared( - &ss, ws, substepShouldContinue); + auto compiledSubstep = std::make_shared( + &ss, workspace, substepShouldContinue, netDefs); if (ss.has_run_every_ms()) { reportSubsteps.push_back(compiledSubstep); } else { @@ -138,30 +254,52 @@ struct CompiledExecutionStep { } } else { for (const string& network_name : step->network()) { - auto* net = ws->GetNet(network_name); - CAFFE_ENFORCE(net != nullptr, "Network ", network_name, " not found."); - networks.push_back(net); + networks.push_back(createAndGetNet(network_name)); } } - netShouldContinue = getContinuationTest(ws, *step); + if (step->has_should_stop_blob()) { + shouldStop = workspace->GetBlob(step->should_stop_blob()); + CAFFE_ENFORCE( + shouldStop, "blob ", step->should_stop_blob(), " does not exist"); + } + + if (step->has_report_net()) { + CAFFE_ENFORCE( + step->has_report_interval(), + "A report_interval must be provided if report_net is set."); + reportNet = createAndGetNet(step->report_net()); + } else { + reportNet = nullptr; + } + + netShouldContinue = getContinuationTest(workspace, *step); shouldContinue = [this, externalShouldContinue](int64_t iter) { return externalShouldContinue(iter) && this->netShouldContinue(iter); }; } - Workspace* workspace; const ExecutionStep* step; - vector> reportSubsteps; - vector> recurringSubsteps; + Workspace* workspace; + vector> reportSubsteps; + vector> recurringSubsteps; vector networks; + NetBase* reportNet; Blob* shouldStop{nullptr}; ShouldContinue netShouldContinue; ShouldContinue shouldContinue; std::atomic gotFailure{false}; + + private: + std::unique_ptr localWorkspace_; }; +std::unique_ptr ExecutionStepWrapper::doCompile() { + return std::unique_ptr(new CompiledExecutionStep( + step_, externalWorkspace_, externalShouldContinue_, netDefs_)); +} + #define CHECK_SHOULD_STOP(step, shouldStop) \ if (getShouldStop(shouldStop)) { \ VLOG(1) << "Execution step " << step.name() << " stopped by " \ @@ -169,48 +307,46 @@ struct CompiledExecutionStep { return true; \ } -bool ExecuteStepRecursive(CompiledExecutionStep& compiledStep) { - const auto& step = *(compiledStep.step); - Workspace* ws = compiledStep.workspace; +bool ExecuteStepRecursive(ExecutionStepWrapper& stepWrapper) { + const auto& step = stepWrapper.step(); + auto compiledStep = stepWrapper.compiled(); VLOG(1) << "Running execution step " << step.name(); std::unique_ptr reporter; - if (step.has_report_net() || compiledStep.reportSubsteps.size() > 0) { + if (step.has_report_net() || compiledStep->reportSubsteps.size() > 0) { reporter = caffe2::make_unique(); - if (step.has_report_net()) { - CAFFE_ENFORCE( - step.has_report_interval(), - "A report_interval must be provided if report_net is set."); - auto* net = ws->GetNet(step.report_net()); - if (!net) { - LOG(ERROR) << "Report net " << step.report_net() << " not found."; - } + auto* reportNet = compiledStep->reportNet; + if (reportNet) { VLOG(1) << "Starting reporter net"; - reporter->start(step.report_interval() * 1000, [=]() { - if (!net->Run()) { + reporter->start(step.report_interval() * 1000, [reportNet]() { + if (!reportNet->Run()) { LOG(WARNING) << "Error running report_net."; } }); } - for (auto& compiledSubstep : compiledStep.reportSubsteps) { - reporter->start(compiledSubstep->step->run_every_ms(), [=]() { - if (!ExecuteStepRecursive(*compiledSubstep)) { - LOG(WARNING) << "Error running report step."; - } - }); + for (auto& substepWrapper : compiledStep->reportSubsteps) { + reporter->start( + substepWrapper->step().run_every_ms(), [substepWrapper]() { + if (!ExecuteStepRecursive(*substepWrapper)) { + LOG(WARNING) << "Error running report step."; + } + }); } } - const Blob* shouldStop = compiledStep.shouldStop; + const Blob* shouldStop = compiledStep->shouldStop; if (step.substep_size()) { - bool sequential = !step.concurrent_substeps() || step.substep().size() <= 1; - for (int64_t iter = 0; compiledStep.shouldContinue(iter); ++iter) { + bool sequential = + (!step.concurrent_substeps() || step.substep().size() <= 1) && + (!step.has_num_concurrent_instances() || + step.num_concurrent_instances() <= 1); + for (int64_t iter = 0; compiledStep->shouldContinue(iter); ++iter) { if (sequential) { VLOG(1) << "Executing step " << step.name() << " iteration " << iter; - for (auto& compiledSubstep : compiledStep.recurringSubsteps) { - if (!ExecuteStepRecursive(*compiledSubstep)) { + for (auto& substepWrapper : compiledStep->recurringSubsteps) { + if (!ExecuteStepRecursive(*substepWrapper)) { return false; } CHECK_SHOULD_STOP(step, shouldStop); @@ -223,45 +359,45 @@ bool ExecuteStepRecursive(CompiledExecutionStep& compiledStep) { std::mutex exception_mutex; string first_exception; auto worker = [&]() { - while (true) { - int substep_id = next_substep++; - if (compiledStep.gotFailure || - (substep_id >= compiledStep.recurringSubsteps.size())) { - break; + auto num_substeps = compiledStep->recurringSubsteps.size(); + int substep_id = next_substep++ % num_substeps; + if (compiledStep->gotFailure) { + return; + } + try { + if (!ExecuteStepRecursive( + *compiledStep->recurringSubsteps.at(substep_id))) { + compiledStep->gotFailure = true; } - try { - if (!ExecuteStepRecursive( - *compiledStep.recurringSubsteps.at(substep_id))) { - compiledStep.gotFailure = true; - } - } catch (const std::exception& ex) { - std::lock_guard guard(exception_mutex); - if (!first_exception.size()) { - first_exception = GetExceptionString(ex); - LOG(ERROR) << "Parallel worker exception:\n" << first_exception; - } - compiledStep.gotFailure = true; - if (!FLAGS_caffe2_handle_executor_threads_exceptions) { - // In complex plans other threads might get stuck if another - // one fails. So we let exception to go out of thread which - // causes SIGABRT. In local setup one might use this flag - // in order to use Python debugger after a failure - throw; - } + } catch (const std::exception& ex) { + std::lock_guard guard(exception_mutex); + if (!first_exception.size()) { + first_exception = GetExceptionString(ex); + LOG(ERROR) << "Parallel worker exception:\n" << first_exception; + } + compiledStep->gotFailure = true; + if (!FLAGS_caffe2_handle_executor_threads_exceptions) { + // In complex plans other threads might get stuck if another + // one fails. So we let exception to go out of thread which + // causes SIGABRT. In local setup one might use this flag + // in order to use Python debugger after a failure + throw; } } }; std::vector threads; - for (int64_t i = 0; i < step.substep().size(); ++i) { - if (!step.substep().Get(i).has_run_every_ms()) { - threads.emplace_back(worker); - } + auto numThreads = compiledStep->recurringSubsteps.size(); + if (step.has_num_concurrent_instances()) { + numThreads *= step.num_concurrent_instances(); + } + for (int64_t i = 0; i < numThreads; ++i) { + threads.emplace_back(worker); } for (auto& thread : threads) { thread.join(); } - if (compiledStep.gotFailure) { + if (compiledStep->gotFailure) { LOG(ERROR) << "One of the workers failed."; if (first_exception.size()) { CAFFE_THROW( @@ -277,9 +413,9 @@ bool ExecuteStepRecursive(CompiledExecutionStep& compiledStep) { return true; } else { // If this ExecutionStep just contains nets, we can directly run it. - for (int64_t iter = 0; compiledStep.shouldContinue(iter); ++iter) { + for (int64_t iter = 0; compiledStep->shouldContinue(iter); ++iter) { VLOG(1) << "Executing networks " << step.name() << " iteration " << iter; - for (NetBase* network : compiledStep.networks) { + for (NetBase* network : compiledStep->networks) { if (!network->Run()) { return false; } @@ -305,30 +441,22 @@ bool RunPlanOnWorkspace( } LOG(INFO) << "Initializing networks."; - std::set seen_net_names_in_plan; + NetDefMap net_defs; for (const NetDef& net_def : plan.network()) { CAFFE_ENFORCE( - seen_net_names_in_plan.count(net_def.name()) == 0, + net_defs.count(net_def.name()) == 0, "Your plan contains networks of the same name \"", net_def.name(), "\", which should not happen. Check your plan to see " "if you made a programming error in creating the plan."); - seen_net_names_in_plan.insert(net_def.name()); - // TODO(jiayq): consider if we want to override the default choice of - // overwriting the nets if exists. The rationale here is that, a plan - // is considered a big end-to-end thing (like a whole training run) and - // is similar to the old Caffe Solver. It is big enough that we want to - // give it a full control over the current workspace. - if (!ws->CreateNet(net_def, true)) { - LOG(ERROR) << "Failed initializing the networks."; - return false; - } + auto netAlreadyExists = ws->GetNet(net_def.name()) != nullptr; + net_defs[net_def.name()] = NetDefInfo{&net_def, netAlreadyExists}; } Timer plan_timer; for (const ExecutionStep& step : plan.execution_step()) { Timer step_timer; - CompiledExecutionStep compiledStep(&step, ws, shouldContinue); - if (!ExecuteStepRecursive(compiledStep)) { + ExecutionStepWrapper stepWrapper(&step, ws, shouldContinue, &net_defs); + if (!ExecuteStepRecursive(stepWrapper)) { LOG(ERROR) << "Failed initializing step " << step.name(); return false; } diff --git a/caffe2/proto/caffe2.proto b/caffe2/proto/caffe2.proto index 403aa386de680..4a67b913a3b51 100644 --- a/caffe2/proto/caffe2.proto +++ b/caffe2/proto/caffe2.proto @@ -263,6 +263,13 @@ message ExecutionStep { // if only_once is true, this step will only be executed once. this ONLY takes // effect when using should_stop_blob optional bool only_once = 10; + + // Whether to create a child workspace for this step. + // If yes, the workflow and nets are re-created every time this step is run. + optional bool create_workspace = 12; + + // How many copies of the children exeuction steps to run concurrently. + optional int32 num_concurrent_instances = 13; } message PlanDef { diff --git a/caffe2/python/checkpoint_test.py b/caffe2/python/checkpoint_test.py index ccd78f472a806..9bff907bfbdbc 100644 --- a/caffe2/python/checkpoint_test.py +++ b/caffe2/python/checkpoint_test.py @@ -108,8 +108,9 @@ def test_load_model_from_checkpoints(self): num_epochs = job_runner(session) self.assertEquals(num_epochs, len(EXPECTED_TOTALS)) - # There are 17 blobs after finishing up the job runner. - self.assertEquals(len(ws.blobs), 17) + # There are 12 global blobs after finishing up the job runner. + # (only blobs on init_group are checkpointed) + self.assertEquals(len(ws.blobs), 12) ws = workspace.C.Workspace() session = LocalSession(ws) diff --git a/caffe2/python/core.py b/caffe2/python/core.py index 2451629be6566..bb5634c20553b 100644 --- a/caffe2/python/core.py +++ b/caffe2/python/core.py @@ -2215,6 +2215,14 @@ def SetIter(self, num_iter): self._assert_can_mutate() self._step.num_iter = num_iter + def SetCreateWorkspace(self, create_workspace): + self._assert_can_mutate() + self._step.create_workspace = create_workspace + + def SetNumConcurrentInstances(self, num_concurrent_instances): + self._assert_can_mutate() + self._step.num_concurrent_instances = num_concurrent_instances + def SetOnlyOnce(self, only_once): self._assert_can_mutate() self._step.only_once = only_once @@ -2368,7 +2376,9 @@ def execution_step(default_name, report_interval=None, concurrent_substeps=None, should_stop_blob=None, - only_once=None): + only_once=None, + num_concurrent_instances=None, + create_workspace=False): """ Helper for creating an ExecutionStep. - steps_or_nets can be: @@ -2400,6 +2410,10 @@ def execution_step(default_name, if report_net is not None: assert report_interval is not None step.SetReportNet(report_net, report_interval) + if num_concurrent_instances is not None: + step.SetNumConcurrentInstances(num_concurrent_instances) + if create_workspace: + step.SetCreateWorkspace(True) if isinstance(steps_or_nets, ExecutionStep): step.AddSubstep(steps_or_nets) diff --git a/caffe2/python/dataio_test.py b/caffe2/python/dataio_test.py index c0549250967d7..2860471f5425c 100644 --- a/caffe2/python/dataio_test.py +++ b/caffe2/python/dataio_test.py @@ -8,34 +8,93 @@ from caffe2.python.pipeline import pipe from caffe2.python.schema import Struct, NewRecord, FeedRecord from caffe2.python.session import LocalSession -from caffe2.python.task import TaskGroup +from caffe2.python.task import TaskGroup, final_output, WorkspaceType from caffe2.python.test_util import TestCase from caffe2.python import core, workspace +from caffe2.python.net_builder import ops import numpy as np +def init_dataset(ws): + src_init = core.Net('src_init') + with core.NameScope('src'): + src_values = Struct(('label', np.array(range(100)))) + src_blobs = NewRecord(src_init, src_values) + src_ds = Dataset(src_blobs) + FeedRecord(src_blobs, src_values, ws) + ws.run(src_init) + return src_ds + + class TestReaderWithLimit(TestCase): + def test_runtime_threads(self): + ws = workspace.C.Workspace() + session = LocalSession(ws) + src_ds = init_dataset(ws) + totals = [None] * 3 + + def proc(rec): + # executed once + with ops.task_init(): + counter1 = ops.CreateCounter([], ['global_counter']) + counter2 = ops.CreateCounter([], ['global_counter2']) + counter3 = ops.CreateCounter([], ['global_counter3']) + # executed once per thread + with ops.task_instance_init(): + task_counter = ops.CreateCounter([], ['task_counter']) + # executed on each iteration + ops.CountUp(counter1) + ops.CountUp(task_counter) + # executed once per thread + with ops.task_instance_exit(): + with ops.loop(ops.RetrieveCount(task_counter)): + ops.CountUp(counter2) + ops.CountUp(counter3) + # executed once + with ops.task_exit(): + totals[0] = final_output(ops.RetrieveCount(counter1)) + totals[1] = final_output(ops.RetrieveCount(counter2)) + totals[2] = final_output(ops.RetrieveCount(counter3)) + return rec + + """ 1. Feed full dataset """ + with TaskGroup() as tg: + pipe(src_ds.reader(), num_runtime_threads=8, processor=proc) + session.run(tg) + self.assertEquals(totals[0].fetch(), 100) + self.assertEquals(totals[1].fetch(), 100) + self.assertEquals(totals[2].fetch(), 8) + + """ 2. Add a few steps in between """ + with TaskGroup() as tg: + q1 = pipe(src_ds.reader(), num_runtime_threads=2) + q2 = pipe( + ReaderWithLimit(q1.reader(), num_iter=25), + num_runtime_threads=3) + pipe(q2, processor=proc, num_runtime_threads=6) + session.run(tg) + self.assertEquals(totals[0].fetch(), 25) + self.assertEquals(totals[1].fetch(), 25) + self.assertEquals(totals[2].fetch(), 6) + + def test_reader_with_limit(self): ws = workspace.C.Workspace() session = LocalSession(ws) """ 1. feed full dataset """ - src_init = core.Net('src_init') - with core.NameScope('src'): - src_values = Struct(('label', np.array(list(range(100))))) - src_blobs = NewRecord(src_init, src_values) - src_ds = Dataset(src_blobs) - FeedRecord(src_blobs, src_values, ws) - ws.run(src_init) + src_ds = init_dataset(ws) """ 2. Read with limit smaller than size of dataset """ dst_init = core.Net('dst_init') with core.NameScope('dst'): - dst_ds = Dataset(src_values.clone_schema()) + dst_ds = Dataset(src_ds.content().clone_schema()) dst_ds.init_empty(dst_init) ws.run(dst_init) - with TaskGroup() as tg: + # WorkspaceType.GLOBAL is required because we are fetching + # reader.data_finished() after the TaskGroup finishes. + with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg: reader = ReaderWithLimit(src_ds.reader(), num_iter=10) pipe(reader, dst_ds.writer(), num_threads=8) session.run(tg) @@ -48,9 +107,9 @@ def test_reader_with_limit(self): """ 3. Read with limit larger than size of dataset """ ws.run(dst_init) - with TaskGroup() as tg: + with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg: reader = ReaderWithLimit(src_ds.reader(), num_iter=110) - pipe(reader, dst_ds.writer(), num_threads=8) + pipe(reader, dst_ds.writer(), num_runtime_threads=8) session.run(tg) self.assertEquals( sorted(ws.blobs[str(dst_ds.content().label())].fetch()), @@ -60,7 +119,7 @@ def test_reader_with_limit(self): """ 4. Read without counter """ ws.run(dst_init) - with TaskGroup() as tg: + with TaskGroup(workspace_type=WorkspaceType.GLOBAL) as tg: reader = ReaderWithLimit(src_ds.reader(), num_iter=None) pipe(reader, dst_ds.writer(), num_threads=8) session.run(tg) diff --git a/caffe2/python/net_builder.py b/caffe2/python/net_builder.py index 86ccd5e4c4156..fbec58ad8b34f 100644 --- a/caffe2/python/net_builder.py +++ b/caffe2/python/net_builder.py @@ -238,6 +238,11 @@ def task_init(self): Defines operations that will be executed once at task startup. Useful when implementing processors, that don't have access to the Task top-level structure. + + This setup will be run only once, even if multiple instances of the task + will run in parallel. For instance-local initialization, use + `task_instance_init` instead. + Example: def my_processor(rec): with ops.task_init(): @@ -252,9 +257,14 @@ def my_processor(rec): def task_exit(self): """ - Define operations to be executed at task shutdown. + Define operations to be executed once at task shutdown. Useful when implementing processors, that don't have access to the Task top-level structure. + + This shutdown will be run only once, after all concurrent instances of + the task have already finished. For instance-local shutdown, + use `task_instance_exit` instead. + Example: def read_queue(queue): with ops.task_exit(): @@ -265,10 +275,37 @@ def read_queue(queue): self.net().add_attribute(Task.TASK_SETUP, setup) return setup + def task_instance_init(self): + """ + Defines operations that will be executed once at startup of each + instance of a task. This can be seen as "thread_local" initialization. + It is guaranteed to run only after all `task_init` logic finishes. + + This setup will be run concurrently for each instance of a task. + For global task initialization, use `task_init` instead. + """ + setup = _SetupBuilder(_SetupBuilder.INIT) + self.net().add_attribute(Task.TASK_INSTANCE_SETUP, setup) + return setup + + def task_instance_exit(self): + """ + Defines operations that will be executed once at shutdown of each + instance of a task. This can be seen as "thread_local" finalization. + + This shutdown will be run concurrently for each instance of a task. + For global task shutdown, use `task_exit` instead. + """ + setup = _SetupBuilder(_SetupBuilder.EXIT) + self.net().add_attribute(Task.TASK_INSTANCE_SETUP, setup) + return setup + def local_init(self): """ Similar to `task_init`, but executes at TaskGroup's startup instead, - before any task of the group starts executing. + before any task of the group starts executing. This will run only + once on each node, before initialization of any task, so it can be + used e.g. to initialize blobs shared across tasks. """ setup = _SetupBuilder(_SetupBuilder.INIT) self.net().add_attribute(TaskGroup.LOCAL_SETUP, setup) @@ -276,8 +313,9 @@ def local_init(self): def local_exit(self): """ - Similar to `task_init`, but executes at TaskGroup's exit instead, + Similar to `task_exit`, but executes at TaskGroup's exit instead, after all tasks of the group finished execution. + This will run only once on each node. """ setup = _SetupBuilder(_SetupBuilder.EXIT) self.net().add_attribute(TaskGroup.LOCAL_SETUP, setup) diff --git a/caffe2/python/net_builder_test.py b/caffe2/python/net_builder_test.py index 6720bb4368342..1d29711ec916c 100644 --- a/caffe2/python/net_builder_test.py +++ b/caffe2/python/net_builder_test.py @@ -4,11 +4,31 @@ from __future__ import unicode_literals from caffe2.python import workspace -from caffe2.python.core import Plan, to_execution_step -from caffe2.python.task import Task, final_output +from caffe2.python.core import Plan, to_execution_step, Net +from caffe2.python.task import Task, TaskGroup, final_output from caffe2.python.net_builder import ops, NetBuilder from caffe2.python.session import LocalSession import unittest +import threading + + +class PythonOpStats(object): + lock = threading.Lock() + num_instances = 0 + num_calls = 0 + + +def python_op_builder(): + PythonOpStats.lock.acquire() + PythonOpStats.num_instances += 1 + PythonOpStats.lock.release() + + def my_op(inputs, outputs): + PythonOpStats.lock.acquire() + PythonOpStats.num_calls += 1 + PythonOpStats.lock.release() + + return my_op def _test_loop(): @@ -122,6 +142,18 @@ def _actual_loop(self): for x in [total, total_large, total_small, total_tiny] ] + def test_net_multi_use(self): + with Task() as task: + total = ops.Const(0) + net = Net('my_net') + net.Add([total, net.Const(1)], [total]) + ops.net(net) + ops.net(net) + result = final_output(total) + with LocalSession() as session: + session.run(task) + self.assertEquals(2, result.fetch()) + def test_loops(self): with Task() as task: out_actual = self._actual_loop() @@ -155,3 +187,61 @@ def test_setup(self): self.assertEquals(o6.fetch(), 6) self.assertEquals(o7_1.fetch(), 7) self.assertEquals(o7_2.fetch(), 7) + + def test_multi_instance_python_op(self): + """ + When task instances are created at runtime, C++ concurrently creates + multiple instances of operators in C++, and concurrently destroys them + once the task is finished. This means that the destructor of PythonOp + will be called concurrently, so the GIL must be acquired. This + test exercises this condition. + """ + with Task(num_instances=64) as task: + with ops.loop(4): + ops.Python((python_op_builder, [], {}))([], []) + with LocalSession() as session: + PythonOpStats.num_instances = 0 + PythonOpStats.num_calls = 0 + session.run(task) + self.assertEquals(PythonOpStats.num_instances, 64) + self.assertEquals(PythonOpStats.num_calls, 256) + + def test_multi_instance(self): + NUM_INSTANCES = 10 + NUM_ITERS = 15 + with TaskGroup() as tg: + with Task(num_instances=NUM_INSTANCES): + with ops.task_init(): + counter1 = ops.CreateCounter([], ['global_counter']) + counter2 = ops.CreateCounter([], ['global_counter2']) + counter3 = ops.CreateCounter([], ['global_counter3']) + # both task_counter and local_counter should be thread local + with ops.task_instance_init(): + task_counter = ops.CreateCounter([], ['task_counter']) + local_counter = ops.CreateCounter([], ['local_counter']) + with ops.loop(NUM_ITERS): + ops.CountUp(counter1) + ops.CountUp(task_counter) + ops.CountUp(local_counter) + # gather sum of squares of local counters to make sure that + # each local counter counted exactly up to NUM_ITERS, and + # that there was no false sharing of counter instances. + with ops.task_instance_exit(): + count2 = ops.RetrieveCount(task_counter) + with ops.loop(ops.Mul([count2, count2])): + ops.CountUp(counter2) + # This should have the same effect as the above + count3 = ops.RetrieveCount(local_counter) + with ops.loop(ops.Mul([count3, count3])): + ops.CountUp(counter3) + # The code below will only run once + with ops.task_exit(): + total1 = final_output(ops.RetrieveCount(counter1)) + total2 = final_output(ops.RetrieveCount(counter2)) + total3 = final_output(ops.RetrieveCount(counter3)) + + with LocalSession() as session: + session.run(tg) + self.assertEquals(total1.fetch(), NUM_INSTANCES * NUM_ITERS) + self.assertEquals(total2.fetch(), NUM_INSTANCES * (NUM_ITERS ** 2)) + self.assertEquals(total3.fetch(), NUM_INSTANCES * (NUM_ITERS ** 2)) diff --git a/caffe2/python/net_printer.py b/caffe2/python/net_printer.py index 6d5d718e86d5f..da0f7526037ba 100644 --- a/caffe2/python/net_printer.py +++ b/caffe2/python/net_printer.py @@ -88,22 +88,24 @@ def analyze_net(analyzer, net): @Analyzer.register(ExecutionStep) def analyze_step(analyzer, step): proto = step.Proto() - if proto.report_net: - with analyzer.set_workspace(do_copy=True): - analyzer(step.get_net(proto.report_net)) - all_new_blobs = set() - substeps = step.Substeps() + [step.get_net(n) for n in proto.network] - for substep in substeps: - with analyzer.set_workspace(do_copy=proto.concurrent_substeps) as ws_in: - analyzer(substep) - if proto.should_stop_blob: - analyzer.need_blob(proto.should_stop_blob) - if proto.concurrent_substeps: - new_blobs = set(ws_in.keys()) - set(analyzer.workspace.keys()) - assert len(all_new_blobs & new_blobs) == 0, ( - 'Error: Blobs created by multiple parallel steps: %s' % ( - ', '.join(all_new_blobs & new_blobs))) - all_new_blobs |= new_blobs + with analyzer.set_workspace(do_copy=proto.create_workspace): + if proto.report_net: + with analyzer.set_workspace(do_copy=True): + analyzer(step.get_net(proto.report_net)) + all_new_blobs = set() + substeps = step.Substeps() + [step.get_net(n) for n in proto.network] + for substep in substeps: + with analyzer.set_workspace( + do_copy=proto.concurrent_substeps) as ws_in: + analyzer(substep) + if proto.should_stop_blob: + analyzer.need_blob(proto.should_stop_blob) + if proto.concurrent_substeps: + new_blobs = set(ws_in.keys()) - set(analyzer.workspace.keys()) + assert len(all_new_blobs & new_blobs) == 0, ( + 'Error: Blobs created by multiple parallel steps: %s' % ( + ', '.join(all_new_blobs & new_blobs))) + all_new_blobs |= new_blobs for x in all_new_blobs: analyzer.define_blob(x) @@ -261,6 +263,11 @@ def _get_step_context(step): return call('loop'), False if proto.num_iter and proto.num_iter != 1: return call('loop', [proto.num_iter]), False + if proto.num_concurrent_instances > 1: + return ( + call('parallel', + [('num_instances', proto.num_concurrent_instances)]), + len(step.Substeps()) > 1) concurrent = proto.concurrent_substeps and len(step.Substeps()) > 1 if concurrent: return call('parallel'), True @@ -279,13 +286,18 @@ def print_step(text, step): text(step.get_net(proto.report_net)) substeps = step.Substeps() + [step.get_net(n) for n in proto.network] for substep in substeps: - if (isinstance(substep, ExecutionStep) and - substep.Proto().run_every_ms): + sub_proto = ( + substep.Proto() if isinstance(substep, ExecutionStep) else None) + if sub_proto is not None and sub_proto.run_every_ms: substep_ctx = call( 'reporter', - [str(substep), ('interval_ms', substep.Proto().run_every_ms)]) + [str(substep), ('interval_ms', sub_proto.run_every_ms)]) elif do_substep: - substep_ctx = call('step', [str(substep)]) + title = ( + 'workspace' + if sub_proto is not None and sub_proto.create_workspace else + 'step') + substep_ctx = call(title, [str(substep)]) else: substep_ctx = None with text.context(substep_ctx): diff --git a/caffe2/python/net_printer_test.py b/caffe2/python/net_printer_test.py index 5b87e696e11a5..bc086c3eee2ac 100644 --- a/caffe2/python/net_printer_test.py +++ b/caffe2/python/net_printer_test.py @@ -6,7 +6,7 @@ from caffe2.python import net_printer from caffe2.python.checkpoint import Job from caffe2.python.net_builder import ops -from caffe2.python.task import Task, final_output +from caffe2.python.task import Task, final_output, WorkspaceType import unittest @@ -47,8 +47,16 @@ def example_task(): o6 = final_output(six) o7_1 = final_output(seven_1) o7_2 = final_output(seven_2) - return o6, o7_1, o7_2 + with Task(num_instances=2): + with ops.task_init(): + one = ops.Const(1) + with ops.task_instance_init(): + local = ops.Const(2) + ops.Add([one, local], [one]) + ops.LogInfo('ble') + + return o6, o7_1, o7_2 def example_job(): with Job() as job: @@ -68,7 +76,8 @@ def test_valid_job(self): with Task(): # distributed_ctx_init_* ignored by analyzer ops.Add(['distributed_ctx_init_a', 'distributed_ctx_init_b']) - net_printer.analyze(example_job()) + # net_printer.analyze(example_job()) + print(net_printer.to_string(example_job())) def test_undefined_blob(self): job = example_job() @@ -82,9 +91,9 @@ def test_undefined_blob(self): def test_multiple_definition(self): job = example_job() with job: - with Task(): + with Task(workspace_type=WorkspaceType.GLOBAL): ops.Add([ops.Const(0), ops.Const(1)], 'out1') - with Task(): + with Task(workspace_type=WorkspaceType.GLOBAL): ops.Add([ops.Const(2), ops.Const(3)], 'out1') with self.assertRaises(AssertionError): net_printer.analyze(job) diff --git a/caffe2/python/pipeline.py b/caffe2/python/pipeline.py index 9d55e6832e251..235a3fd3d51f4 100644 --- a/caffe2/python/pipeline.py +++ b/caffe2/python/pipeline.py @@ -93,7 +93,7 @@ def normalize_processor_output(output): def pipe( input, output=None, num_threads=1, processor=None, name=None, - capacity=None, group=None): + capacity=None, group=None, num_runtime_threads=1): """ Given a Reader, Queue or DataStream in `input`, and optionally, a Writer, Queue or DataStream in `output`, creates a Task that, when run, will @@ -113,6 +113,9 @@ def pipe( piping. If set to 0, no Task is created, and a reader is returned instead -- the reader returned will read from the reader passed in and process it. + ** DEPRECATED **. Use `num_runtime_threads` instead. + This option will be removed once all readers/processors + support `num_runtime_threads`. processor: (optional) function that takes an input record and optionally returns a record; this will be called between read and write steps. If the processor does @@ -125,19 +128,25 @@ def pipe( is created and written to. group: (optional) explicitly add the created Task to this TaskGroup, instead of using the currently active one. + num_runtime_threads: Similar to `num_threads`, but instead of expanding + the tasks with a `for` loop in python, does that at + runtime. This is preferable to `num_threads`, but some + processors/readers still require to be called multiple + times in python. Returns: Output Queue, DataStream, Reader, or None, depending on the parameters passed. """ result, _ = _pipe_step( - input, output, num_threads, processor, name, capacity, group) + input, output, num_threads, processor, name, capacity, group, + num_runtime_threads) return result def pipe_and_output( input, output=None, num_threads=1, processor=None, name=None, - capacity=None, group=None, final_outputs=None): + capacity=None, group=None, num_runtime_threads=1, final_outputs=None): """ Similar to `pipe`, with the additional ability for the pipe Task to return output values to the `Session` once done. @@ -151,7 +160,7 @@ def pipe_and_output( assert num_threads > 0 result, task = _pipe_step( input, output, num_threads, processor, name, capacity, group, - final_outputs) + num_runtime_threads, final_outputs) output = None if final_outputs is not None: output = task.outputs() @@ -172,32 +181,62 @@ def processor_name(processor): return processor.__class__.__name__ -def _pipe_step( - input, output=None, num_threads=1, processor=None, name=None, - capacity=None, group=None, final_outputs=None): - """ - """ - if isinstance(input, Reader): - reader = input - elif hasattr(input, 'reader'): - reader = input.reader() - else: - raise ValueError('in must be a reader, queue or streaam.') +def _runtime_threads_task(name, group, final_outputs, reader, num_threads, + output, capacity): + node_name = str(Node.current()) + profiler_name = "{0}/{1}/{2}/{3}/{4}".format( + node_name, + "pipe", + name, + processor_name(input) if input else "NoInput", + processor_name(output) if output else "NoOutput") - if processor is not None: - reader = ProcessingReader(reader, processor) + with Task(name=name, group=group, outputs=final_outputs, + num_instances=num_threads) as task: + global_exit_net = core.Net('pipe:exit') + global_init_net = core.Net('pipe:init') + reader.setup_ex(global_init_net, global_exit_net) - if num_threads == 0: - assert output is None - return reader, None + init_net = core.Net('pipe:instance:init') + exit_net = core.Net('pipe:instance:exit') + read_nets, status, rec = reader.read_record_ex(init_net, exit_net) - if name is None and processor is not None: - name = processor_name(processor) - if name is None and output is not None: - name = 'pipe_into:%s' % processor_name(output) - if name is None: - name = 'pipe_from:%s' % processor_name(input) + if rec is not None: + out_queue, writer = _init_output( + output, capacity, global_init_net, global_exit_net) + write_nets, _ = writer.write_record_ex( + rec, init_net, exit_net, status) + else: + out_queue = None + write_nets = [] + + with ops.task_init(): + ops.net(global_init_net) + with ops.task_instance_init(): + ops.net(init_net) + + timer_start_net = core.Net('timer_start') + timer = timer_start_net.TimerBegin([], counter_name=profiler_name) + timer_end_net = core.Net('timer_end') + timer_end_net.TimerEnd(timer, []) + + ops.net(core.execution_step( + 'body', + [timer_start_net] + list(read_nets) + list(write_nets) + + [timer_end_net], + should_stop_blob=status)) + ops.net(timer_end_net) + + with ops.task_instance_exit(): + ops.net(exit_net) + with ops.task_exit(): + ops.net(global_exit_net) + return out_queue, task + + +def _static_threads_task(name, group, final_outputs, reader, num_threads, + output, capacity): node_name = str(Node.current()) profiler_name = "{0}/{1}/{2}/{3}/{4}".format( node_name, @@ -261,6 +300,44 @@ def _pipe_step( return out_queue, task +def _pipe_step( + input, output=None, num_threads=1, processor=None, name=None, + capacity=None, group=None, num_runtime_threads=None, final_outputs=None): + """ + """ + assert num_threads <= 1 or num_runtime_threads <= 1, ( + 'Only one of num_threads or num_runtime_threads must be set.') + + if isinstance(input, Reader): + reader = input + elif hasattr(input, 'reader'): + reader = input.reader() + else: + raise ValueError('in must be a reader, queue or stream.') + + if processor is not None: + reader = ProcessingReader(reader, processor) + + if num_threads == 0 or num_runtime_threads == 0: + assert output is None + return reader, None + + if name is None and processor is not None: + name = processor_name(processor) + if name is None and output is not None: + name = 'pipe_into:%s' % processor_name(output) + if name is None: + name = 'pipe_from:%s' % processor_name(input) + + if num_threads > 1: + return _static_threads_task( + name, group, final_outputs, reader, num_threads, output, capacity) + else: + return _runtime_threads_task( + name, group, final_outputs, reader, num_runtime_threads, output, + capacity) + + class ProcessingReader(Reader): """ Reader that reads from an upstream reader, calls the processor, and returns diff --git a/caffe2/python/pybind_state.cc b/caffe2/python/pybind_state.cc index 28fba7bfbe41d..747ccfbcc3f8d 100644 --- a/caffe2/python/pybind_state.cc +++ b/caffe2/python/pybind_state.cc @@ -250,7 +250,13 @@ PythonOpBase::PythonOpBase( } } -PythonOpBase::~PythonOpBase() {} +PythonOpBase::~PythonOpBase() { + if (built_func_) { + // since it may trigger python interpreter when refcount reaches zero + py::gil_scoped_acquire g; + built_func_.reset(); + } +} bool PythonOpBase::RunOnDevice() { std::vector inputs; diff --git a/caffe2/python/session.py b/caffe2/python/session.py index dff7d71c83e9e..5217b4256616f 100644 --- a/caffe2/python/session.py +++ b/caffe2/python/session.py @@ -68,8 +68,9 @@ class Session(object): Blob visibility: Tasks running in different nodes in parallel will always run under different workspaces, so it must be assumed that they won't be able to - access each other's blobs. On the other hand, tasks running on the same - node are guaranteed to run on the same workspace within a run. + access each other's blobs. Tasks running on the same node will follow + Workspace hierarchy rules: tasks running on separate private workspaces + will only be able to share blobs defined on a common parent Workspace. """ _compiled_cache = {} diff --git a/caffe2/python/task.py b/caffe2/python/task.py index 1c5621ccf0b19..710173ba95e14 100644 --- a/caffe2/python/task.py +++ b/caffe2/python/task.py @@ -142,6 +142,18 @@ def get_setup_nets(key, steps_or_nets, target): return init_nets, exit_nets +def add_setup_steps(step, init_nets, exit_nets, name): + if not init_nets and not exit_nets: + return step + steps = [] + if init_nets: + steps.append(core.execution_step('%s:init' % name, init_nets)) + steps.append(step) + if len(exit_nets) > 0: + steps.append(core.execution_step('%s:exit' % name, exit_nets)) + return core.execution_step(name, steps) + + @context.define_context(allow_default=False) class TaskGroup(object): """ @@ -279,14 +291,18 @@ def tasks_by_node(self, node_remap=None): # shortcut for single task with no queue steps = report_steps outputs = [] - workspace_type = tasks[0].workspace_type() + grouped_workspace_type = WorkspaceType.PRIVATE for task in tasks: step = task.get_step() + step.SetCreateWorkspace( + task.workspace_type() == WorkspaceType.PRIVATE) if step is not None: steps.append(step) outputs += task.outputs() - assert workspace_type == task.workspace_type(), ( - 'All tasks for a given node need same workspace type.') + # If any of the tasks in the node uses the global workspace, + # then set the grouped task to use the global workspace as well + if task.workspace_type() == WorkspaceType.GLOBAL: + grouped_workspace_type = WorkspaceType.GLOBAL if len(steps) == 0: steps.append(core.execution_step('empty', [])) if len(steps) == 1: @@ -307,7 +323,7 @@ def tasks_by_node(self, node_remap=None): Task( node=node, step=step, outputs=outputs, name='grouped_by_node', - group=grouped_by_node, workspace_type=workspace_type) + group=grouped_by_node, workspace_type=grouped_workspace_type) self._tasks_by_node = (grouped_by_node, node_map) return grouped_by_node @@ -406,9 +422,34 @@ class Task(object): be run by a Session. Task outputs are fetched by the session at the end of the run. + + The recommended way of creating a task is by using `net_builder.ops`. + Example: + + from net_builder import ops + with Node('trainer'), Task(name='my_task', num_instances=2): + with ops.task_init(): + globl = ops.Const(0) + with ops.task_instance_init(): + local = ops.Const(0) + with ops.loop(100): + ops.Copy(globl, local) + with ops.task_instance_exit(): + ops.Add([globl, local], [globl]) + with ops.task_exit(): + ops.Mul([globl, globl], [blobl]) + + The task above will create 2 instances that will run in parallel. + Each instance will copy `local` to `globl` 100 times, Then Add `local` + to `globl` once. The `Mul` will only execute once, after all the instances + of the task have finished. """ + # TASK_SETUP runs once per task, before/after all + # concurrent task instances start/finish. TASK_SETUP = 'task_setup' + # Setup will run once for each instance of the task. + TASK_INSTANCE_SETUP = 'task_instance_setup' REPORT_STEP = 'report_step' _global_names_used = set() @@ -428,9 +469,20 @@ def _get_next_name(node, group, name): def __init__( self, step=None, outputs=None, - workspace_type=None, group=None, node=None, name=None): + workspace_type=None, group=None, node=None, name=None, + num_instances=None): """ Instantiate a Task and add it to the current TaskGroup and Node. + + Args: + step: If provided, this task will run this ExecutionStep. + outputs: If provided, the task will return the provided outputs + to the client at completion time. + node: If provided, force task execution on the given node. + name: Name of the Task. + num_instances: If provided, this task will be cloned num_instances + times at runtime, and all instances will run + concurrently. """ if not name and isinstance(step, core.ExecutionStep): name = step.Proto().name @@ -459,6 +511,7 @@ def __init__( self._is_pipeline_context = False self._workspace_type = workspace_type self._report_net = None + self._num_instances = num_instances def __enter__(self): # temporarily remove from _tasks_to_add to ensure correct order @@ -505,36 +558,50 @@ def set_step(self, step): self._step = core.to_execution_step(step) def get_step(self): - if self._step is not None and self._step_with_setup is None: - report_steps = [ - s - for s in self._step.get_all_attributes(Task.REPORT_STEP) - if not hasattr(s, '_report_step_used') - ] - for step in report_steps: - step._report_step_used = True - if not step.Proto().run_every_ms: - step.RunEveryMillis(1000) - init_nets, exit_nets = get_setup_nets( - Task.TASK_SETUP, [self._step] + report_steps, self) - if len(self._outputs) == 0: - output_net = core.Net('%s:output' % self.name) - self.add_output(output_net.ConstantFill( - [], 1, dtype=core.DataType.INT32, value=0)) - exit_nets.append(output_net) - - body = self._step if not report_steps else core.execution_step( - '%s:body' % self.name, report_steps + [self._step]) - self._step_with_setup = core.execution_step( - self.name, - [ - core.execution_step('%s:init' % self.name, init_nets), - body, - core.execution_step('%s:exit' % self.name, exit_nets), - ] - ) - elif self._step_with_setup is None: + if self._step_with_setup is not None: + return self._step_with_setup + + if self._step is None: self._step_with_setup = core.execution_step(self.name, []) + return self._step_with_setup + + report_steps = [ + s + for s in self._step.get_all_attributes(Task.REPORT_STEP) + if not hasattr(s, '_report_step_used') + ] + for step in report_steps: + step._report_step_used = True + if not step.Proto().run_every_ms: + step.RunEveryMillis(1000) + task_init_nets, task_exit_nets = get_setup_nets( + Task.TASK_SETUP, [self._step] + report_steps, self) + instance_init_nets, instance_exit_nets = get_setup_nets( + Task.TASK_INSTANCE_SETUP, [self._step] + report_steps, self) + if len(self._outputs) == 0: + output_net = core.Net('%s:output' % self.name) + self.add_output(output_net.ConstantFill( + [], 1, dtype=core.DataType.INT32, value=0)) + task_exit_nets.append(output_net) + + # Add instance-level report steps + body = self._step if not report_steps else core.execution_step( + '%s:body' % self.name, report_steps + [self._step]) + # Enclose with instance-level (thread-local) setup nets + step_with_instance_setup = add_setup_steps( + body, instance_init_nets, instance_exit_nets, + self.name + ':instance') + # Set up runtime concurrent instances + if self._num_instances and self._num_instances > 1: + step_with_instance_setup.SetCreateWorkspace(True) + step_with_instance_setup = core.execution_step( + '%s:parallel', + [step_with_instance_setup], + num_concurrent_instances=self._num_instances) + # Enclose with task-level setup nets + self._step_with_setup = add_setup_steps( + step_with_instance_setup, task_init_nets, task_exit_nets, self.name) + return self._step_with_setup def output_list(self):