Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster backend #1281

Merged
merged 28 commits into from
Aug 23, 2021
Merged

Cluster backend #1281

merged 28 commits into from
Aug 23, 2021

Conversation

hitomitak
Copy link
Contributor

@hitomitak hitomitak commented Jun 24, 2021

Summary

Take over #1084

Adds a new option of the backend to provide the user's executor. When user gives dask client as executor, Aer can execute a simulation on the distributed machines like HPC clusters.

When the executor is set, AerJobSet object is returned instead of a normal AerJob object.
AerJobSet divides multiple experiments in one qobj into each experiment and submits each qobj to the executor as AerJob.
After simulations, AerJobSet collects each result and combines them into one result object.

Example Usage:
Threadpool execution:

exc = ThreadPoolExecutor(max_workers=2)
qbackend = Aer.get_backend('qasm_simulator')
result_ideal = qiskit.execute(circ_list, qbackend, executor=exc).result()

Dask execution:

exc = Client(address=LocalCluster(n_workers=1, processes=True))
qbackend = Aer.get_backend('qasm_simulator')
result_ideal = qiskit.execute(circ_list, qbackend, executor=exc).result()

Details and comments

I executed QV (10 depth, 64 circuits)and measured the execution time on 3VMs with Dask executor.
Env: 3VMs, OS: CentOS8.0, Memory 64 GB, CPU: 32 x 2.0 GHz or higher Cores.
1 Node : a normal Aer simulator
1 Node (DASK) : only one node with dask executor
2 Node (DASK) : two nodes with dask executor
3 Node (DASK) : three nodes with dask executor

fig3

@chriseclectic chriseclectic added this to the Release 0.9 milestone Jul 20, 2021
@hitomitak hitomitak force-pushed the cluster_backend branch 5 times, most recently from 274320a to dee725b Compare July 21, 2021 06:44
Copy link
Member

@mtreinish mtreinish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good. I did a quick initial skim over the code and have a couple inline comments. Also this definitely needs a release note (see: https://github.com/Qiskit/qiskit-aer/blob/main/CONTRIBUTING.md#release-notes ) to document the new feature and how to use it.

Also I'm wondering do you think we want to ask dask as an optional extra dependency in the setup.py. The code relies on the futures interface so it's not required, but it might be nice to let users do something like pip install 'qiskit-aer[dask]'. What do you think?

Comment on lines 162 to 164
if "executor" in run_options:
executor = run_options["executor"]
del run_options["executor"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this why not just add executor to the default options? That will also enable users to something like:

sim_backend = AerSimulator(executor=dask_magic())
sim_backend.run([qc]*10**5)
sim.backend.run([new_qc]*10**4)

and have it all run on dask.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first, I added the executor option as your code but DASK client variable is needed to be a local variable for the serialization. So I moved the executor option as a run option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this comment on why it can't be a a config option? Is the probably re-using an executor between calls to run? If so can executors be copied to avoid this?

Copy link
Contributor Author

@hitomitak hitomitak Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class AerBackend(Backend, ABC):
    def __init__():
        self.executor = None:
   /* ... */
    def run():
        self.executor = getattr(self._options, 'executor')
class AerBackend(Backend, ABC):
  /*...*/ 
    def run():
        executor = None
        executor = self._options_configuration["executor"]

These codes occur a cloudpickle error because the executor is not a local variable. The same error is mentioned in this website

So I changed the executor option as the run option because executor is passed every calling run function. Are there any good idea to realize the executor options as a config option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the following code can be executed. I don't know why this code is ok but I can move executor option to default options.

class AerBackend(Backend, ABC):
    def __init__():
        self.executor = None:
   /* ... */
    def run():
         executor = None
        if self._executor:
           executor = self._executor
        else:
           executor = getattr(self._options, 'executor')
           delattr(self._options, 'executor')
       /* ....*/     
       aer_job_set = AerJobSet(self, job_id, self._run, experiments, executor)
       aer_job_set.submit()
       self._executor = executor
       return aer_job_set

Comment on lines 158 to 160
if backend_options and "executor" in backend_options:
executor = backend_options["executor"]
del backend_options["executor"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to support doing this in backend_options even though it's deprecated.

@hitomitak
Copy link
Contributor Author

Thank you for the comment. I will add a release note soon.

Also I'm wondering do you think we want to ask dask as an optional extra dependency in the setup.py. The code relies on the futures interface so it's not required, but it might be nice to let users do something like pip install 'qiskit-aer[dask]'. What do you think?

I agree. I will update setup.py to install dask if users add the option.

Separate logic for a custom executor and job splitting so both can be changed separately.
* Adds `executor` option to simulators for specifying a custom executor
* Adds `max_job_size` option to simulators for specifying the max number of circuits per job when submitting to executor. If less than the number of circuits this will result in a job set.
* Add max_size options to splitting code to control job sizes
* Clean up splitting code in aerbackend so there are not so many conditionals
Change executor tests to use AerSimulator and reduce CI load
on number of run tests, since we don't need to test too many circuits.
Separate logic for a custom executor and job splitting so both can be changed separately.
* Adds `executor` option to simulators for specifying a custom executor
* Adds `max_job_size` option to simulators for specifying the max number of circuits per job when submitting to executor. If less than the number of circuits this will result in a job set.
* Add max_size options to splitting code to control job sizes
* Clean up splitting code in aerbackend so there are not so many conditionals
Change executor tests to use AerSimulator and reduce CI load
on number of run tests, since we don't need to test too many circuits.
@chriseclectic
Copy link
Member

@hitomitak I accidently pushed directly to your branch when making some changes so I reverted those commits and then added them to a PR to your branch: hitomitak#2

@hitomitak
Copy link
Contributor Author

hitomitak commented Aug 18, 2021

@chriseclectic Thank you for the refactoring. I changed the code to get executor option from qob.config to options in _get_job_submit_args function because DASK serialization is failed.

@hitomitak hitomitak force-pushed the cluster_backend branch 5 times, most recently from b6b2706 to 3910f0e Compare August 19, 2021 04:14
chriseclectic
chriseclectic previously approved these changes Aug 23, 2021
Copy link
Member

@chriseclectic chriseclectic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hiding the Dask (and running MPI) documentation in the contributing guide, which is intended for documenting additional details for developers building from source, seems wrong. We should make a follow up PR to move this documentation to its own page in the API docs. Other than that LGTM

@@ -738,6 +738,45 @@ meta = dict['metadata']
myrank = meta['mpi_rank']
```

### Running with Threadpool and DASK
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this and the MPI docs in contributing to a page in the API docs, this can be done as a follow up though

CONTRIBUTING.md Outdated Show resolved Hide resolved
Copy link
Member

@chriseclectic chriseclectic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes for renamed file

test/terra/backends/aer_simulator/test_job_splitting.py Outdated Show resolved Hide resolved
test/terra/backends/aer_simulator/test_job_splitting.py Outdated Show resolved Hide resolved
test/terra/backends/aer_simulator/test_executors.py Outdated Show resolved Hide resolved
test/terra/backends/aer_simulator/test_executors.py Outdated Show resolved Hide resolved
test/terra/backends/aer_simulator/test_executors.py Outdated Show resolved Hide resolved
@chriseclectic chriseclectic merged commit 7512ece into Qiskit:main Aug 23, 2021
@mtreinish mtreinish added the Changelog: New Feature Include in the Added section of the changelog label Sep 16, 2021
@iotaisolutions
Copy link

iotaisolutions commented Oct 4, 2021

Getting serialization errors while trying to submit Qiskit Circuit List to Qiskit Aer Simulator

With respect to project: qiskit-advocate/qamp-fall-21#39, I am trying to simulate few simple Qiskit circuit lists on a Qiskit Aer Simulator, with compute engine as Dask on Kubernetes Cluster, but continuously facing serialization (pickling) error(s).

I am even getting these errors while testing a DASK code example available on Qiskit Documentation Portal : https://qiskit.org/documentation/apidoc/parallel.html

** Execution Environment**

  • dask-kubernetes : 2021.3.1
  • distributed : 2021.9.0
  • minikube v1.23.1
  • Docker version 20.10.7
  • Python 3.8
  • OS : Ubuntu 20.04

Error Messages

Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/worker.py", line 3809, in dumps_function
result = cache_dumps[func]
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/utils.py", line 1366, in getitem
value = super().getitem(key)
File "/usr/lib/python3.8/collections/init.py", line 1010, in getitem
raise KeyError(key)
KeyError: <bound method AerBackend._run of AerSimulator('aer_simulator')>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'Client.init..'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "dask_test_1.py", line 34, in
q_exec()
File "dask_test_1.py", line 30, in q_exec
result = qbackend.run(circ_list).result()
File "/home/ubuntu/.local/lib/python3.8/site-packages/qiskit/utils/deprecation.py", line 62, in wrapper
return func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.8/site-packages/qiskit/providers/aer/backends/aerbackend.py", line 222, in run
aer_job.submit()
File "/home/ubuntu/.local/lib/python3.8/site-packages/qiskit/providers/aer/jobs/aerjobset.py", line 90, in submit
aer_job.submit()
File "/home/ubuntu/.local/lib/python3.8/site-packages/qiskit/providers/aer/jobs/aerjob.py", line 58, in submit
self._future = self._executor.submit(self._fn, self._qobj, self._job_id)
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/client.py", line 1550, in submit
futures = self._graph_to_futures(
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/client.py", line 2571, in _graph_to_futures
dsk = dsk.dask_distributed_pack(self, keyset, annotations)
File "/home/ubuntu/.local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 1063, in dask_distributed_pack
"state": layer.dask_distributed_pack(
File "/home/ubuntu/.local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 421, in dask_distributed_pack
dsk = toolz.valmap(dumps_task, dsk)
File "/home/ubuntu/.local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(d.keys(), map(func, d.values())))
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/worker.py", line 3847, in dumps_task
return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/worker.py", line 3811, in dumps_function
result = pickle.dumps(func, protocol=4)
File "/home/ubuntu/.local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/ubuntu/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/ubuntu/.local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_asyncio.Task' object

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Changelog: New Feature Include in the Added section of the changelog
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants