diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8ff064c409..d4118cb812 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -738,6 +738,45 @@ meta = dict['metadata'] myrank = meta['mpi_rank'] ``` +### Running with Threadpool and DASK + +Qiskit Aer runs simulation jobs on a single-worker Python multiprocessing ThreadPool executor so that all parallelization is handled by low-level OpenMP and CUDA code. However to customize job-level parallel execution of multiple circuits a user can specify a custom multiprocessing executor and control the splitting of circuits using the `executor`and `max_job_size` backend options. For large scale job parallelization on HPC clusters Qiskit Aer executors support the distributed Clients from the [Dask](https://dask.org/) library. +If you want to install dask client at the same time as Qiskit Aer, please add `dask` option as follows. This option installs Aer, dask, and distributed packages. +``` +pip install .[dask] +``` + +To use Threadpool or DASK as an executor, you need to set `executor` and `max_job_size` by `set_options` function. If both `executor` (default None) and `max_job_size` (default None) are set, Aer splits the multiple circuits to some chunk of circuits and submits them to the executor. `max_job_size` can control the number of splitting circuits. When `max_job_size` is set to 1, multiple circuits are split into one circuit and distributed to the executor. If user executes 60 circuits with the executor and `max_job_size=1`, Aer splits it to 1 circuit x 60 jobs. +If 60 circuits and `max_job_size=2`, Aer splits it to 2 circuits x 30 jobs. + +Example Usage: +Threadpool execution: + +``` +from concurrent.futures import ThreadPoolExecutor + +exc = ThreadPoolExecutor(max_workers=2) +qbackend = Aer.get_backend('qasm_simulator') +qbackend.set_options(executor=exc) +qbackend.set_options(max_job_size=3) +result = qbackend.run(circuits, seed_simulator=54321).result() +``` + +Dask execution: + +Dask client creates multi-processes so you need to gurad it by if __name__ == "__main__": block. +``` +from dask.distributed import Client + +def dask_exec(): + exc = Client(address=LocalCluster(n_workers=1, processes=True)) + qbackend.set_options(executor=exc) + qbackend.set_options(max_job_size=3) + result = qbackend.run(circuits, seed_simulator=54321).result() + +if __name__ == "__main__": + dask_exec() +``` ### Building a statically linked wheel diff --git a/qiskit/providers/aer/__init__.py b/qiskit/providers/aer/__init__.py index 3b7cbe3bc3..c32f10a879 100644 --- a/qiskit/providers/aer/__init__.py +++ b/qiskit/providers/aer/__init__.py @@ -44,12 +44,13 @@ StatevectorSimulator UnitarySimulator -Job Class -========= +Job Classes +=========== .. autosummary:: :toctree: ../stubs/ AerJob + AerJobSet Exceptions ========== @@ -70,7 +71,7 @@ # pylint: disable=wrong-import-position from .aerprovider import AerProvider -from .aerjob import AerJob +from .jobs import AerJob, AerJobSet from .aererror import AerError from .backends import * from . import library diff --git a/qiskit/providers/aer/backends/aer_simulator.py b/qiskit/providers/aer/backends/aer_simulator.py index cf851030e1..0c4fc7d818 100644 --- a/qiskit/providers/aer/backends/aer_simulator.py +++ b/qiskit/providers/aer/backends/aer_simulator.py @@ -158,6 +158,14 @@ class AerSimulator(AerBackend): certain simulation methods to either ``"single"`` or ``"double"`` precision (default: ``"double"``). + * ``executor`` (futures.Executor or None): Set a custom executor for + asynchronous running of simulation jobs (Default: None). + + * ``max_job_size`` (int or None): If the number of run circuits + exceeds this value simulation will be run as a set of of sub-jobs + on the executor. If ``None`` simulation of all circuits are submitted + to the executor as a single job (Default: None). + * ``zero_threshold`` (double): Sets the threshold for truncating small values to zero in the result data (Default: 1e-10). @@ -496,6 +504,8 @@ def _default_options(cls): method='automatic', device='CPU', precision="double", + executor=None, + max_job_size=None, zero_threshold=1e-10, validation_threshold=None, max_parallel_threads=None, diff --git a/qiskit/providers/aer/backends/aerbackend.py b/qiskit/providers/aer/backends/aerbackend.py index 466075c7b3..83c8b77bd7 100644 --- a/qiskit/providers/aer/backends/aerbackend.py +++ b/qiskit/providers/aer/backends/aerbackend.py @@ -30,9 +30,10 @@ from qiskit.qobj import QasmQobj, PulseQobj from qiskit.compiler import assemble -from ..aerjob import AerJob +from ..jobs import AerJob, AerJobSet, split_qobj from ..aererror import AerError + # Logger logger = logging.getLogger(__name__) @@ -60,6 +61,7 @@ def default(self, obj): class AerBackend(Backend, ABC): """Qiskit Aer Backend class.""" + def __init__(self, configuration, properties=None, @@ -98,6 +100,7 @@ def __init__(self, self._options_configuration = {} self._options_defaults = {} self._options_properties = {} + self._executor = None # Set available methods self._available_methods = [] if available_methods is None else available_methods @@ -126,6 +129,9 @@ def run(self, Additional Information: kwarg options specified in ``run_options`` will temporarily override any set options of the same name for the current run. + + Raises: + ValueError: if run is not implemented """ if isinstance(circuits, (QasmQobj, PulseQobj)): warnings.warn('Using a qobj for run() is deprecated and will be ' @@ -148,17 +154,18 @@ def run(self, else: qobj = assemble(circuits, self) - # Add backend options to the Job qobj - self._add_options_to_qobj(qobj, **run_options) - - # Optional validation - if validate: - self._validate(qobj) + # Add submit args for the job + experiments, executor = self._get_job_submit_args(qobj, validate=validate, **run_options) + executor = executor or self._executor # Submit job job_id = str(uuid.uuid4()) - aer_job = AerJob(self, job_id, self._run, qobj) + if isinstance(experiments, list): + aer_job = AerJobSet(self, job_id, self._run, experiments, executor) + else: + aer_job = AerJob(self, job_id, self._run, experiments, executor) aer_job.submit() + self._executor = executor return aer_job def configuration(self): @@ -319,6 +326,7 @@ def set_option(self, key, value): setattr(self._options, key, getattr(self._default_options(), key)) def set_options(self, **fields): + """Set the simulator options""" for key, value in fields.items(): self.set_option(key, value) @@ -343,9 +351,16 @@ def _set_defaults_option(self, key, value): elif key in self._options_defaults: self._options_defaults.pop(key) - def _add_options_to_qobj(self, qobj, - **run_options): + def _get_job_submit_args(self, qobj, validate=False, **run_options): """Return execution sim config dict from backend options.""" + # Get executor + executor = None + if hasattr(self._options, 'executor'): + executor = getattr(self._options, 'executor') + # We need to remove the executor from the qobj config + # since it can't be serialized though JSON/Pybind. + delattr(self._options, 'executor') + # Add options to qobj config overriding any existing fields config = qobj.config @@ -358,7 +373,14 @@ def _add_options_to_qobj(self, qobj, for key, val in run_options.items(): setattr(config, key, val) - return qobj + # Optional validation + if validate: + self._validate(qobj) + + # Split circuits for sub-jobs + experiments = split_qobj( + qobj, max_size=getattr(qobj.config, 'max_job_size', None)) + return experiments, executor def __repr__(self): """String representation of an AerBackend.""" diff --git a/qiskit/providers/aer/backends/pulse_simulator.py b/qiskit/providers/aer/backends/pulse_simulator.py index d1b109b350..3cf064e83d 100644 --- a/qiskit/providers/aer/backends/pulse_simulator.py +++ b/qiskit/providers/aer/backends/pulse_simulator.py @@ -119,7 +119,13 @@ class PulseSimulator(AerBackend): ``|1>``. Defaults to ``2``. * ``meas_return``: Measurement type, ``'single'`` or ``'avg'``. Defaults to ``'avg'``. * ``shots``: Number of shots per experiment. Defaults to ``1024``. + * ``executor``: Set a custom executor for asynchronous running of simulation + * ``max_job_size`` (int or None): If the number of run schedules + exceeds this value simulation will be run as a set of of sub-jobs + on the executor. If ``None`` simulation of all schedules are submitted + to the executor as a single job (Default: None). + jobs (Default: None). **Simulation details** @@ -187,7 +193,9 @@ def _default_options(cls): qubit_freq_est=inf, q_level_meas=1, noise_model=None, - initial_state=None) + initial_state=None, + executor=None, + max_job_size=None) # pylint: disable=arguments-differ, missing-param-doc @deprecate_arguments({'qobj': 'schedules'}) diff --git a/qiskit/providers/aer/backends/qasm_simulator.py b/qiskit/providers/aer/backends/qasm_simulator.py index fda2104750..110163a1ef 100644 --- a/qiskit/providers/aer/backends/qasm_simulator.py +++ b/qiskit/providers/aer/backends/qasm_simulator.py @@ -116,6 +116,14 @@ class QasmSimulator(AerBackend): certain simulation methods to either ``"single"`` or ``"double"`` precision (default: ``"double"``). + * ``executor`` (futures.Executor): Set a custom executor for + asynchronous running of simulation jobs (Default: None). + + * ``max_job_size`` (int or None): If the number of run circuits + exceeds this value simulation will be run as a set of of sub-jobs + on the executor. If ``None`` simulation of all circuits are submitted + to the executor as a single job (Default: None). + * ``zero_threshold`` (double): Sets the threshold for truncating small values to zero in the result data (Default: 1e-10). @@ -385,6 +393,8 @@ def _default_options(cls): shots=1024, method=None, precision="double", + executor=None, + max_job_size=None, zero_threshold=1e-10, validation_threshold=None, max_parallel_threads=None, diff --git a/qiskit/providers/aer/backends/statevector_simulator.py b/qiskit/providers/aer/backends/statevector_simulator.py index 3dc8ddac05..d300551b6c 100644 --- a/qiskit/providers/aer/backends/statevector_simulator.py +++ b/qiskit/providers/aer/backends/statevector_simulator.py @@ -63,6 +63,14 @@ class StatevectorSimulator(AerBackend): certain simulation methods to either ``"single"`` or ``"double"`` precision (default: ``"double"``). + * ``executor`` (futures.Executor): Set a custom executor for + asynchronous running of simulation jobs (Default: None). + + * ``max_job_size`` (int or None): If the number of run circuits + exceeds this value simulation will be run as a set of of sub-jobs + on the executor. If ``None`` simulation of all circuits aer submitted + to the executor as a single job (Default: None). + * ``zero_threshold`` (double): Sets the threshold for truncating small values to zero in the result data (Default: 1e-10). @@ -183,6 +191,8 @@ def _default_options(cls): shots=1024, method="automatic", precision="double", + executor=None, + max_job_size=None, zero_threshold=1e-10, validation_threshold=None, max_parallel_threads=None, diff --git a/qiskit/providers/aer/backends/unitary_simulator.py b/qiskit/providers/aer/backends/unitary_simulator.py index b0353757c7..55dd672e8b 100644 --- a/qiskit/providers/aer/backends/unitary_simulator.py +++ b/qiskit/providers/aer/backends/unitary_simulator.py @@ -65,6 +65,14 @@ class UnitarySimulator(AerBackend): certain simulation methods to either ``"single"`` or ``"double"`` precision (default: ``"double"``). + * ``executor`` (futures.Executor): Set a custom executor for + asynchronous running of simulation jobs (Default: None). + + * ``max_job_size`` (int or None): If the number of run circuits + exceeds this value simulation will be run as a set of of sub-jobs + on the executor. If ``None`` simulation of all circuits are submitted + to the executor as a single job (Default: None). + * ``"initial_unitary"`` (matrix_like): Sets a custom initial unitary matrix for the simulation instead of identity (Default: None). @@ -180,6 +188,8 @@ def _default_options(cls): shots=1024, method="automatic", precision="double", + executor=None, + max_job_size=None, zero_threshold=1e-10, seed_simulator=None, validation_threshold=None, diff --git a/qiskit/providers/aer/jobs/__init__.py b/qiskit/providers/aer/jobs/__init__.py new file mode 100644 index 0000000000..610ce28117 --- /dev/null +++ b/qiskit/providers/aer/jobs/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +# This code is part of Qiskit. +# +# (C) Copyright IBM 2018, 2019. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +""" +Aer simulator job management. +""" + +from .aerjob import AerJob +from .aerjobset import AerJobSet +from .utils import split_qobj diff --git a/qiskit/providers/aer/aerjob.py b/qiskit/providers/aer/jobs/aerjob.py similarity index 73% rename from qiskit/providers/aer/aerjob.py rename to qiskit/providers/aer/jobs/aerjob.py index e65367566f..def068dd0b 100644 --- a/qiskit/providers/aer/aerjob.py +++ b/qiskit/providers/aer/jobs/aerjob.py @@ -14,34 +14,13 @@ """This module implements the job class used for AerBackend objects.""" -import warnings -from concurrent import futures import logging -import functools from qiskit.providers import JobV1 as Job from qiskit.providers import JobStatus, JobError +from .utils import DEFAULT_EXECUTOR, requires_submit -logger = logging.getLogger(__name__) - - -def requires_submit(func): - """ - Decorator to ensure that a submit has been performed before - calling the method. - - Args: - func (callable): test function to be decorated. - - Returns: - callable: the decorated function. - """ - @functools.wraps(func) - def _wrapper(self, *args, **kwargs): - if self._future is None: - raise JobError("Job not submitted yet!. You have to .submit() first!") - return func(self, *args, **kwargs) - return _wrapper +LOGGER = logging.getLogger(__name__) class AerJob(Job): @@ -51,17 +30,11 @@ class AerJob(Job): _executor (futures.Executor): executor to handle asynchronous jobs """ - _executor = futures.ThreadPoolExecutor(max_workers=1) - - def __init__(self, backend, job_id, fn, qobj, *args): + def __init__(self, backend, job_id, fn, qobj, executor=None): super().__init__(backend, job_id) self._fn = fn self._qobj = qobj - if args: - warnings.warn('Using *args for AerJob is deprecated. All backend' - ' options should be contained in the assembled Qobj.', - DeprecationWarning) - self._args = args + self._executor = executor or DEFAULT_EXECUTOR self._future = None def submit(self): @@ -70,16 +43,11 @@ def submit(self): Raises: QobjValidationError: if the JSON serialization of the Qobj passed during construction does not validate against the Qobj schema. - JobError: if trying to re-submit the job. """ if self._future is not None: - raise JobError("We have already submitted the job!") - - self._future = self._executor.submit(self._fn, - self._qobj, - self._job_id, - *self._args) + raise JobError("Aer job has already been submitted.") + self._future = self._executor.submit(self._fn, self._qobj, self._job_id) @requires_submit def result(self, timeout=None): @@ -142,3 +110,7 @@ def qobj(self): Qobj: the Qobj submitted for this job. """ return self._qobj + + def executor(self): + """Return the executor for this job""" + return self._executor diff --git a/qiskit/providers/aer/jobs/aerjobset.py b/qiskit/providers/aer/jobs/aerjobset.py new file mode 100644 index 0000000000..04ea05788d --- /dev/null +++ b/qiskit/providers/aer/jobs/aerjobset.py @@ -0,0 +1,371 @@ +# -*- coding: utf-8 -*- + +# This code is part of Qiskit. +# +# (C) Copyright IBM 2019, 2020. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=arguments-differ + +"""A set of cluster jobs for Aer.""" + +from typing import List, Optional, Union, Tuple, Iterable +import time +import logging +import copy +import datetime +import uuid + +from qiskit.circuit import QuantumCircuit +from qiskit.pulse import Schedule +from qiskit.qobj import QasmQobj +from qiskit.providers import JobV1 as Job +from qiskit.providers import JobStatus, JobError +from qiskit.result import Result + +from .utils import DEFAULT_EXECUTOR, requires_submit +from .aerjob import AerJob + +logger = logging.getLogger(__name__) + + +class AerJobSet(Job): + """A set of cluster jobs. + + An instance of this class is returned when you submit experiments with + executor option. + It provides methods that allow you to interact + with the jobs as a single entity. For example, you can retrieve the results + for all of the jobs using :meth:`result()` and cancel all jobs using + :meth:`cancel()`. + """ + + def __init__(self, backend, job_id, func, experiments: List[QasmQobj], executor=None): + """AerJobSet constructor. + + Args: + backend(Aerbackend): Aerbackend. + job_id(int): Job Id. + func(fun): Callabled function. + experiments(List[QasmQobj]): List[QasmQobjs] to execute. + executor(ThreadPoolExecutor or dask.distributed.client): The executor + to be used to submit the job. + + """ + super().__init__(backend, job_id) + self._experiments = experiments + + # Used for caching + self._future = None + self._futures = [] + self._results = None + self._fn = func + self._executor = executor or DEFAULT_EXECUTOR + self._start_time = None + self._end_time = None + + def submit(self): + """Execute this set of jobs on an executor. + + Raises: + RuntimeError: If the jobs were already submitted. + """ + if self._futures: + raise RuntimeError( + 'The jobs for this managed job set have already been submitted.') + + self._future = True + self._start_time = datetime.datetime.now() + for i, exp in enumerate(self._experiments): + job_id = str(uuid.uuid4()) + logger.debug("Job %s submitted", i + 1) + aer_job = AerJob(self._backend, job_id, self._fn, exp, self._executor) + aer_job.submit() + aer_job._future.add_done_callback(self._set_end_time) + self._futures.append(aer_job) + + @requires_submit + def status(self, worker: Union[None, int, Iterable[int]] + ) -> Union[JobStatus, List[JobStatus]]: + """Return the status of each job in this set. + + Args + worker: Worker id. When None, all workers' statuses are returned. + + Returns: + A list of job statuses. + """ + if isinstance(worker, int): + aer_job = self._futures[worker] + return aer_job.satus() + elif isinstance(worker, Iterable): + job_list = [] + for worker_id in worker: + aer_job = self._futures[worker_id] + job_list.append(aer_job.status()) + return job_list + else: + return [aer.status() for aer in self._futures] + + @requires_submit + def result(self, + timeout: Optional[float] = None, + ) -> Result: + """Return the results of the jobs as a single Result object. + + This call will block until all job results become available or + the timeout is reached. + + Args: + timeout: Number of seconds to wait for job results. + + Returns: + qiskit.Result: Result object + + Raises: + JobError: if unable to retrieve all job results before the + specified timeout. + + """ + res = self.worker_results(worker=None, timeout=timeout) + return self._combine_results(res) + + @requires_submit + def worker_results(self, + worker: Union[None, int, Iterable[int]], + timeout: Optional[float] = None, + ) -> Union[Result, List[Result]]: + """Return the result of the jobs specified with worker_id. + + When the worker is None, this call return all worker's result. + + Args: + worker: Worker id to wait for job result. + timeout: Number of seconds to wait for job results. + + Returns: + qiskit.Result: Result object + instance that can be used to retrieve results + for individual experiments. + + Raises: + JobError: if unable to retrieve all job results before the + specified timeout. + """ + + # We'd like to use futures.as_completed or futures.wait + # however this excludes the use of dask as executor + # because dask's futures are not ~exactly~ the same. + res = [] + + if isinstance(worker, int): + return self._get_worker_result(worker, timeout) + elif isinstance(worker, Iterable): + for worker_id in worker: + res.append(self._get_worker_result(worker_id, timeout)) + return res + else: + for worker_id in range(len(self._futures)): + res.append(self._get_worker_result(worker_id, timeout)) + return res + + def _get_worker_result(self, worker: int, timeout: Optional[float] = None): + """Return the result of the jobs specified with worker_id. + + this call return all worker's result specified worker and + block until job result become available or the timeout is reached. + Analogous to dask.client.gather() + + Args: + worker: Worker id to wait for job result. + timeout: Number of seconds to wait for job results. + + Returns: + qiskit.Result: Result object + instance that can be used to retrieve a result. + + Raises: + JobError: if unable to retrieve all job results before the + specified timeout. + """ + start_time = time.time() + original_timeout = timeout + aer_job = self._futures[worker] + + try: + result = aer_job.result(timeout=timeout) + if result is None or not result.success: + if result: + logger.warning('ClusterJob %s Error: %s', aer_job.name(), result.header) + else: + logger.warning('ClusterJob %s did not return a result', aer_job.name()) + except JobError: + raise JobError( + 'Timeout while waiting for the results of experiment {}'.format( + aer_job.name())) + + if timeout: + timeout = original_timeout - (time.time() - start_time) + if timeout <= 0: + raise JobError( + "Timeout while waiting for JobSet results") + return result + + def _combine_results(self, + results: List[Union[Result, None]] = None + ) -> Result: + """Combine results from all jobs into a single `Result`. + + Note: + Since the order of the results must match the order of the initial + experiments, job results can only be combined if all jobs succeeded. + + Args: + results: Result will be combined. + Returns: + A :class:`~qiskit.result.Result` object that contains results from + all jobs. + Raises: + JobError: If results cannot be combined because some jobs failed. + """ + if not results: + raise JobError( + "Results cannot be combined - no results.") + + # find first non-null result and copy it's config + _result = next((r for r in results if r is not None), None) + if _result: + combined_result = copy.deepcopy(_result) + combined_result.results = [] + else: + raise JobError( + "Results cannot be combined - no results.") + + for each_result in results: + if each_result is not None: + combined_result.results.extend(each_result.results) + + combined_result_dict = combined_result.to_dict() + + if self._end_time is None: + self._end_time = datetime.datetime.now() + + if self._start_time: + _time_taken = self._end_time - self._start_time + combined_result_dict["time_taken"] = _time_taken.total_seconds() + else: + combined_result_dict["time_taken"] = 0 + + combined_result_dict["date"] = datetime.datetime.isoformat(self._end_time) + combined_result = Result.from_dict(combined_result_dict) + return combined_result + + @requires_submit + def cancel(self) -> None: + """Cancel all jobs in this job set.""" + for aer_job in self._futures: + aer_job.cancel() + + @requires_submit + def job(self, experiment: Union[str, QuantumCircuit, Schedule]) -> Tuple[AerJob, int]: + """Retrieve the job used to submit the specified experiment and its index. + + Args: + experiment: Retrieve the job used to submit this experiment. Several + types are accepted for convenience: + + * str: The name of the experiment. + * QuantumCircuit: The name of the circuit instance will be used. + * Schedule: The name of the schedule instance will be used. + + Returns: + A tuple of the job used to submit the experiment and the experiment index. + + Raises: + JobError: If the job for the experiment could not be found. + """ + worker_index = self.worker(experiment) + return self.worker_job(worker_index) + + @requires_submit + def worker(self, + experiment: Union[str, QuantumCircuit, Schedule] + ) -> Union[int, List[int]]: + """Retrieve the index of job. + + Args: + experiment: Retrieve the job used to submit this experiment. Several + types are accepted for convenience: + + * str: The name of the experiment. + * QuantumCircuit: The name of the circuit instance will be used. + * Schedule: The name of the schedule instance will be used. + + Returns: + list or integer value of the job id + + Raises: + JobError: If the job for the experiment could not be found. + """ + + if isinstance(experiment, (QuantumCircuit, Schedule)): + experiment = experiment.name + job_list = [] + for job in self._futures: + for i, exp in enumerate(job.qobj().experiments): + if hasattr(exp.header, 'name') and exp.header.name == experiment: + job_list.append(i) + + if len(job_list) == 1: + return job_list[0] + elif len(job_list) > 1: + return job_list + + raise JobError( + 'Unable to find the job for experiment {}.'.format(experiment)) + + @requires_submit + def worker_job(self, + worker: Union[None, int, Iterable[int]] + ) -> Union[AerJob, List[AerJob]]: + """Retrieve the job specified with job's id + + Args: + worker: retrive job used to submit with this job id. + + Returns: + A list of :class:`~qiskit.providers.aer.AerJob` + instances that represents the submitted jobs. + + Raises: + JobError: If the job for the experiment could not be found. + """ + aer_jobs = [] + if isinstance(worker, int): + return self._futures[worker] + elif isinstance(worker, Iterable): + for worker_id in worker: + aer_jobs.append(self._futures[worker_id]) + return aer_jobs + else: + return self._futures + + def _set_end_time(self, future): + """Set job's end time to calculate "time_taken" value + + Args: + future(concurrent.futures or dask.distributed.futures): callback future object + """ + # pylint: disable=unused-argument + self._end_time = datetime.datetime.now() + + def executor(self): + """Return the executor for this job""" + return self._executor diff --git a/qiskit/providers/aer/jobs/utils.py b/qiskit/providers/aer/jobs/utils.py new file mode 100644 index 0000000000..9e1c847c8f --- /dev/null +++ b/qiskit/providers/aer/jobs/utils.py @@ -0,0 +1,89 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2017, 2019. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Utility functions for Aer job management.""" +import uuid +import copy +from math import ceil +from functools import singledispatch, update_wrapper, wraps +from concurrent.futures import ThreadPoolExecutor + +from qiskit.providers import JobError + +DEFAULT_EXECUTOR = ThreadPoolExecutor(max_workers=1) + + +def requires_submit(func): + """ + Decorator to ensure that a submit has been performed before + calling the method. + + Args: + func (callable): test function to be decorated. + + Returns: + callable: the decorated function. + """ + @wraps(func) + def _wrapper(self, *args, **kwargs): + if self._future is None: + raise JobError("Job not submitted yet!. You have to .submit() first!") + return func(self, *args, **kwargs) + return _wrapper + + +def methdispatch(func): + """ + Returns a wrapper function that selects which registered function + to call based on the type of args[2] + """ + dispatcher = singledispatch(func) + + def wrapper(*args, **kw): + return dispatcher.dispatch(args[2].__class__)(*args, **kw) + wrapper.register = dispatcher.register + update_wrapper(wrapper, func) + return wrapper + + +def split_qobj(qobj, max_size=None, qobj_id=None): + """Split a qobj and return a list of qobjs each with a single experiment. + + Args: + qobj (Qobj): The input qobj object to split + max_size (int or None): the maximum number of circuits per job. If + None don't split (Default: None). + qobj_id (str): Optional, set a fixed qobj ID for all subjob qobjs. + + Returns: + List: A list of qobjs. + """ + # Check if we don't need to split + if max_size is None or not max_size > 0: + return qobj + num_jobs = ceil(len(qobj.experiments) / max_size) + if num_jobs == 1: + return qobj + + # Check for parameterizations + params = getattr(qobj.config, 'parameterizations', None) + qobjs = [] + for i in range(num_jobs): + sub_id = qobj_id or str(uuid.uuid4()) + indices = slice(i * max_size, (i + 1) * max_size) + sub_exp = qobj.experiments[indices] + sub_config = qobj.config + if params is not None: + sub_config = copy.copy(qobj.config) + sub_config.parameterizations = params[indices] + qobjs.append(type(qobj)(sub_id, sub_config, sub_exp, qobj.header)) + return qobjs diff --git a/setup.py b/setup.py index d4e8410b23..285a1084bb 100644 --- a/setup.py +++ b/setup.py @@ -79,6 +79,11 @@ def install_needed_req(import_name, package_name=None, min_version=None, max_ver 'scikit-build>=0.11.0', 'cmake!=3.17,!=3.17.0', ] + +extras_requirements = { + "dask": ["dask", "distributed"] +} + if not _DISABLE_CONAN: setup_requirements.append('conan>=1.22.2') @@ -140,6 +145,7 @@ def install_needed_req(import_name, package_name=None, min_version=None, max_ver install_requires=requirements, setup_requires=setup_requirements, include_package_data=True, + extras_require=extras_requirements, cmake_args=cmake_args, keywords="qiskit aer simulator quantum addon backend", zip_safe=False diff --git a/test/terra/backends/aer_simulator/test_executors.py b/test/terra/backends/aer_simulator/test_executors.py new file mode 100644 index 0000000000..0fb2ddf2bb --- /dev/null +++ b/test/terra/backends/aer_simulator/test_executors.py @@ -0,0 +1,120 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2018, 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +""" +AerSimualtor options tests +""" +import logging +from math import ceil +import concurrent.futures + +from ddt import ddt +from qiskit import QuantumCircuit, transpile +from qiskit.circuit.random import random_circuit +from qiskit.quantum_info import Statevector +from qiskit.providers.aer.jobs import AerJob, AerJobSet +from test.terra.backends.simulator_test_case import ( + SimulatorTestCase, supported_methods) + + +DASK = False + +try: + from dask.distributed import LocalCluster, Client + DASK = True +except ImportError: + DASK = False + + +def run_random_circuits(backend, shots=None, **run_options): + """Test random circuits on different executor fictures""" + job_size = 10 + circuits = [random_circuit(num_qubits=2, depth=2, seed=i) + for i in range(job_size)] + # Sample references counts + targets = [] + for circ in circuits: + state = Statevector(circ) + state.seed = 101 + targets.append(state.sample_counts(shots=shots)) + + # Add measurements for simulation + for circ in circuits: + circ.measure_all() + + circuits = transpile(circuits, backend) + job = backend.run(circuits, shots=shots, **run_options) + result = job.result() + return result, circuits, targets + + +class CBFixture(SimulatorTestCase): + """Extension tests for Aerbackend with cluster backend""" + @classmethod + def setUpClass(cls): + super().setUpClass() + """Override me with an executor init.""" + cls._test_executor = None + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + if cls._test_executor: + cls._test_executor.shutdown() + + def backend(self, **options): + """Return AerSimulator backend using current class options""" + return super().backend(executor=self._test_executor, **options) + + +@ddt +class TestDaskExecutor(CBFixture): + """Tests of Dask executor""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + if DASK: + cls._test_executor = Client(address=LocalCluster(n_workers=1, processes=True)) + + def setUp(self): + super().setUp() + if not DASK: + self.skipTest('Dask not installed, skipping ClusterBackend-dask tests') + + @supported_methods(['statevector'], [None, 1, 2, 3]) + def test_random_circuits_job(self, method, device, max_job_size): + """Test random circuits with custom executor.""" + shots = 4000 + backend = self.backend( + method=method, device=device, max_job_size=max_job_size) + result, circuits, targets = run_random_circuits(backend, shots=shots) + self.assertSuccess(result) + self.compare_counts(result, circuits, targets, hex_counts=False, delta=0.05 * shots) + +@ddt +class TestThreadPoolExecutor(CBFixture): + """Tests of ThreadPool executor""" + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._test_executor = None + cls._test_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) + + @supported_methods(['statevector'], [None, 1, 2, 3]) + def test_random_circuits_job(self, method, device, max_job_size): + """Test random circuits with custom executor.""" + shots = 4000 + backend = self.backend( + method=method, device=device, max_job_size=max_job_size) + result, circuits, targets = run_random_circuits(backend, shots=shots) + self.assertSuccess(result) + self.compare_counts(result, circuits, targets, hex_counts=False, delta=0.05 * shots) diff --git a/test/terra/backends/aer_simulator/test_job_splitting.py b/test/terra/backends/aer_simulator/test_job_splitting.py new file mode 100644 index 0000000000..126fee8245 --- /dev/null +++ b/test/terra/backends/aer_simulator/test_job_splitting.py @@ -0,0 +1,82 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2018, 2019. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=arguments-differ + +import unittest +import logging +from ddt import ddt, data + +from qiskit import transpile, assemble +from qiskit.circuit.random import random_circuit +from qiskit.providers.aer.jobs import split_qobj +from test.terra.reference.ref_snapshot_expval import ( + snapshot_expval_circuits, snapshot_expval_circuit_parameterized) + +from test.terra.backends.simulator_test_case import SimulatorTestCase + + +@ddt +class TestJobSplitting(SimulatorTestCase): + """Test job splitting option""" + + @staticmethod + def parameterized_circuits(): + """Return ParameterizedQobj for settings.""" + pcirc1, param1 = snapshot_expval_circuit_parameterized(single_shot=False, + measure=True, + snapshot=False) + circuits2to4 = snapshot_expval_circuits(pauli=True, + skip_measure=False, + single_shot=False) + pcirc2, param2 = snapshot_expval_circuit_parameterized(single_shot=False, + measure=True, + snapshot=False) + circuits = [pcirc1] + circuits2to4 + [pcirc2] + params = [param1, [], [], [], param2] + return circuits, params + + def split_compare(self, circs, parameterizations=None): + """Qobj split test""" + qobj = assemble(circs, + parameterizations=parameterizations, + qobj_id='testing') + if parameterizations: + qobjs = [assemble(c, parameterizations=[p], + qobj_id='testing') for (c, p) in zip(circs, parameterizations)] + else: + qobjs = [assemble(c, qobj_id='testing') for c in circs] + + test_qobjs = split_qobj(qobj, max_size=1, qobj_id='testing') + self.assertEqual(len(test_qobjs), len(qobjs)) + for ref, test in zip(qobjs, test_qobjs): + self.assertEqual(ref, test) + + def test_split(self): + """Circuits split test""" + shots = 2000 + backend = self.backend(max_job_size=1) + circs = [random_circuit(num_qubits=2, depth=4, measure=True, seed=i) + for i in range(2)] + circs = transpile(circs, backend) + self.split_compare(circs) + + def test_parameterized_split(self): + """Parameterized circuits split test""" + shots = 2000 + backend = self.backend(max_job_size=1) + circs, params = self.parameterized_circuits() + self.split_compare(circs, parameterizations=params) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/terra/backends/statevector_simulator/statevector_gates.py b/test/terra/backends/statevector_simulator/statevector_gates.py index 7891741e09..74839ab35d 100644 --- a/test/terra/backends/statevector_simulator/statevector_gates.py +++ b/test/terra/backends/statevector_simulator/statevector_gates.py @@ -111,7 +111,8 @@ def test_gate(self, gate_cls, num_angles, has_ctrl_qubits, basis_gates): for circuit in circuits: target = Statevector.from_instruction(circuit) result = execute(circuit, self.SIMULATOR, - basis_gates=basis_gates).result() + basis_gates=basis_gates, + **self.BACKEND_OPTS).result() self.assertSuccess(result) value = Statevector(result.get_statevector(0)) self.assertTrue(target.equiv(value), diff --git a/test/terra/backends/unitary_simulator/unitary_gates.py b/test/terra/backends/unitary_simulator/unitary_gates.py index cd3c426d60..e4390f5688 100644 --- a/test/terra/backends/unitary_simulator/unitary_gates.py +++ b/test/terra/backends/unitary_simulator/unitary_gates.py @@ -107,7 +107,8 @@ def test_gate(self, gate_cls, num_angles, basis_gates): for circuit in circuits: target = Operator(circuit) result = execute(circuit, self.SIMULATOR, - basis_gates=basis_gates).result() + basis_gates=basis_gates, + **self.BACKEND_OPTS).result() self.assertSuccess(result) value = Operator(result.get_unitary(0)) self.assertTrue(target.equiv(value), diff --git a/test/terra/extensions/test_wrappers.py b/test/terra/extensions/test_wrappers.py index 6099effddd..c0c6b4d2be 100644 --- a/test/terra/extensions/test_wrappers.py +++ b/test/terra/extensions/test_wrappers.py @@ -50,7 +50,7 @@ def _create_qobj(self, backend, noise_model=None): qobj = assemble(transpile(circuit, backend), backend) opts = {'max_parallel_threads': 1, 'library_dir': LIBRARY_DIR} - backend._add_options_to_qobj(qobj, **opts, noise_model=noise_model) + qobj, _ = backend._get_job_submit_args(qobj, **opts, noise_model=noise_model) return qobj def _map_and_test(self, cfunc, qobj):