From 2369761916b8f889e93648a28bee11675ee52c2f Mon Sep 17 00:00:00 2001 From: Takashi Imamichi <31178928+t-imamichi@users.noreply.github.com> Date: Fri, 15 Mar 2024 23:18:49 +0400 Subject: [PATCH] Add BackendSamplerV2 (#11928) * add BackendSamplerV2 * reno * Apply suggestions from code review Co-authored-by: Ian Hincks Co-authored-by: Ikko Hamamura * allow BackendV1 * update docstring * add options * move default_shots to options and update doc/reno --------- Co-authored-by: Ian Hincks Co-authored-by: Ikko Hamamura --- qiskit/primitives/__init__.py | 2 + qiskit/primitives/backend_sampler_v2.py | 233 +++++++ ...d-backend-sampler-v2-5e40135781eebc7f.yaml | 28 + .../primitives/test_backend_sampler_v2.py | 655 ++++++++++++++++++ 4 files changed, 918 insertions(+) create mode 100644 qiskit/primitives/backend_sampler_v2.py create mode 100644 releasenotes/notes/add-backend-sampler-v2-5e40135781eebc7f.yaml create mode 100644 test/python/primitives/test_backend_sampler_v2.py diff --git a/qiskit/primitives/__init__.py b/qiskit/primitives/__init__.py index 83967600153d..dfc315676e92 100644 --- a/qiskit/primitives/__init__.py +++ b/qiskit/primitives/__init__.py @@ -405,6 +405,7 @@ BaseSamplerV2 StatevectorSampler + BackendSamplerV2 Results V2 ---------- @@ -475,3 +476,4 @@ from .statevector_estimator import StatevectorEstimator from .statevector_sampler import StatevectorSampler from .backend_estimator_v2 import BackendEstimatorV2 +from .backend_sampler_v2 import BackendSamplerV2 diff --git a/qiskit/primitives/backend_sampler_v2.py b/qiskit/primitives/backend_sampler_v2.py new file mode 100644 index 000000000000..ddd7b0b9b2fc --- /dev/null +++ b/qiskit/primitives/backend_sampler_v2.py @@ -0,0 +1,233 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Sampler V2 implementation for an arbitrary Backend object.""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass +from typing import Iterable + +import numpy as np +from numpy.typing import NDArray + +from qiskit.circuit import QuantumCircuit +from qiskit.primitives.backend_estimator import _run_circuits +from qiskit.primitives.base import BaseSamplerV2 +from qiskit.primitives.containers import ( + BitArray, + PrimitiveResult, + PubResult, + SamplerPubLike, + make_data_bin, +) +from qiskit.primitives.containers.bit_array import _min_num_bytes +from qiskit.primitives.containers.sampler_pub import SamplerPub +from qiskit.primitives.primitive_job import PrimitiveJob +from qiskit.providers.backend import BackendV1, BackendV2 +from qiskit.result import Result + + +@dataclass +class Options: + """Options for :class:`~.BackendSamplerV2`""" + + default_shots: int = 1024 + """The default shots to use if none are specified in :meth:`~.run`. + Default: 1024. + """ + + seed_simulator: int | None = None + """The seed to use in the simulator. If None, a random seed will be used. + Default: None. + """ + + +@dataclass +class _MeasureInfo: + creg_name: str + num_bits: int + num_bytes: int + start: int + + +class BackendSamplerV2(BaseSamplerV2): + """Evaluates bitstrings for provided quantum circuits + + The :class:`~.BackendSamplerV2` class is a generic implementation of the + :class:`~.BaseSamplerV2` interface that is used to wrap a :class:`~.BackendV2` + (or :class:`~.BackendV1`) object in the class :class:`~.BaseSamplerV2` API. It + facilitates using backends that do not provide a native + :class:`~.BaseSamplerV2` implementation in places that work with + :class:`~.BaseSamplerV2`. However, + if you're using a provider that has a native implementation of + :class:`~.BaseSamplerV2`, it is a better choice to leverage that native + implementation as it will likely include additional optimizations and be + a more efficient implementation. The generic nature of this class + precludes doing any provider- or backend-specific optimizations. + + This class does not perform any measurement or gate mitigation. + + Each tuple of ``(circuit, parameter values, shots)``, called a sampler + primitive unified bloc (PUB), produces its own array-valued result. The :meth:`~run` method can + be given many pubs at once. + + The options for :class:`~.BackendSamplerV2` consist of the following items. + + * ``default_shots``: The default shots to use if none are specified in :meth:`~run`. + Default: 1024. + + * ``seed_simulator``: The seed to use in the simulator. If None, a random seed will be used. + Default: None. + + .. note:: + + This class requires a backend that supports ``memory`` option. + + """ + + def __init__( + self, + *, + backend: BackendV1 | BackendV2, + options: dict | None = None, + ): + """ + Args: + backend: The backend to run the primitive on. + options: The options to control the default shots (``default_shots``) and + the random seed for the simulator (``seed_simulator``). + """ + self._backend = backend + self._options = Options(**options) if options else Options() + + @property + def backend(self) -> BackendV1 | BackendV2: + """Returns the backend which this sampler object based on.""" + return self._backend + + @property + def options(self) -> Options: + """Return the options""" + return self._options + + def run( + self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None + ) -> PrimitiveJob[PrimitiveResult[PubResult]]: + if shots is None: + shots = self._options.default_shots + coerced_pubs = [SamplerPub.coerce(pub, shots) for pub in pubs] + self._validate_pubs(coerced_pubs) + job = PrimitiveJob(self._run, coerced_pubs) + job._submit() + return job + + def _validate_pubs(self, pubs: list[SamplerPub]): + for i, pub in enumerate(pubs): + if len(pub.circuit.cregs) == 0: + warnings.warn( + f"The {i}-th pub's circuit has no output classical registers and so the result " + "will be empty. Did you mean to add measurement instructions?", + UserWarning, + ) + + def _run(self, pubs: Iterable[SamplerPub]) -> PrimitiveResult[PubResult]: + results = [self._run_pub(pub) for pub in pubs] + return PrimitiveResult(results) + + def _run_pub(self, pub: SamplerPub) -> PubResult: + meas_info, max_num_bytes = _analyze_circuit(pub.circuit) + bound_circuits = pub.parameter_values.bind_all(pub.circuit) + arrays = { + item.creg_name: np.zeros( + bound_circuits.shape + (pub.shots, item.num_bytes), dtype=np.uint8 + ) + for item in meas_info + } + flatten_circuits = np.ravel(bound_circuits).tolist() + result_memory, _ = _run_circuits( + flatten_circuits, + self._backend, + memory=True, + shots=pub.shots, + seed_simulator=self._options.seed_simulator, + ) + memory_list = _prepare_memory(result_memory, max_num_bytes) + + for samples, index in zip(memory_list, np.ndindex(*bound_circuits.shape)): + for item in meas_info: + ary = _samples_to_packed_array(samples, item.num_bits, item.start) + arrays[item.creg_name][index] = ary + + data_bin_cls = make_data_bin( + [(item.creg_name, BitArray) for item in meas_info], + shape=bound_circuits.shape, + ) + meas = { + item.creg_name: BitArray(arrays[item.creg_name], item.num_bits) for item in meas_info + } + data_bin = data_bin_cls(**meas) + return PubResult(data_bin, metadata={}) + + +def _analyze_circuit(circuit: QuantumCircuit) -> tuple[list[_MeasureInfo], int]: + meas_info = [] + max_num_bits = 0 + for creg in circuit.cregs: + name = creg.name + num_bits = creg.size + start = circuit.find_bit(creg[0]).index + meas_info.append( + _MeasureInfo( + creg_name=name, + num_bits=num_bits, + num_bytes=_min_num_bytes(num_bits), + start=start, + ) + ) + max_num_bits = max(max_num_bits, start + num_bits) + return meas_info, _min_num_bytes(max_num_bits) + + +def _prepare_memory(results: list[Result], num_bytes: int) -> NDArray[np.uint8]: + lst = [] + for res in results: + for exp in res.results: + if hasattr(exp.data, "memory") and exp.data.memory: + data = b"".join(int(i, 16).to_bytes(num_bytes, "big") for i in exp.data.memory) + data = np.frombuffer(data, dtype=np.uint8).reshape(-1, num_bytes) + else: + # no measure in a circuit + data = np.zeros((exp.shots, num_bytes), dtype=np.uint8) + lst.append(data) + ary = np.array(lst, copy=False) + return np.unpackbits(ary, axis=-1, bitorder="big") + + +def _samples_to_packed_array( + samples: NDArray[np.uint8], num_bits: int, start: int +) -> NDArray[np.uint8]: + # samples of `Backend.run(memory=True)` will be the order of + # clbit_last, ..., clbit_1, clbit_0 + # place samples in the order of clbit_start+num_bits-1, ..., clbit_start+1, clbit_start + if start == 0: + ary = samples[:, -start - num_bits :] + else: + ary = samples[:, -start - num_bits : -start] + # pad 0 in the left to align the number to be mod 8 + # since np.packbits(bitorder='big') pads 0 to the right. + pad_size = -num_bits % 8 + ary = np.pad(ary, ((0, 0), (pad_size, 0)), constant_values=0) + # pack bits in big endian order + ary = np.packbits(ary, axis=-1, bitorder="big") + return ary diff --git a/releasenotes/notes/add-backend-sampler-v2-5e40135781eebc7f.yaml b/releasenotes/notes/add-backend-sampler-v2-5e40135781eebc7f.yaml new file mode 100644 index 000000000000..9cd9002a77be --- /dev/null +++ b/releasenotes/notes/add-backend-sampler-v2-5e40135781eebc7f.yaml @@ -0,0 +1,28 @@ +--- +features: + - | + The implementation :class:`~.BackendSamplerV2` of :class:`~.BaseSamplerV2` was added. + This sampler supports :class:`~.BackendV1` and :class:`~.BackendV2` that allow + ``memory`` option to compute bitstrings. + + .. code-block:: python + + import numpy as np + from qiskit import transpile + from qiskit.circuit.library import IQP + from qiskit.primitives import BackendSamplerV2 + from qiskit.providers.fake_provider import Fake7QPulseV1 + from qiskit.quantum_info import random_hermitian + + backend = Fake7QPulseV1() + sampler = BackendSamplerV2(backend=backend) + n_qubits = 5 + mat = np.real(random_hermitian(n_qubits, seed=1234)) + circuit = IQP(mat) + circuit.measure_all() + isa_circuit = transpile(circuit, backend=backend, optimization_level=1) + job = sampler.run([isa_circuit], shots=100) + result = job.result() + print(f"> bitstrings: {result[0].data.meas.get_bitstrings()}") + print(f"> counts: {result[0].data.meas.get_counts()}") + print(f"> Metadata: {result[0].metadata}") diff --git a/test/python/primitives/test_backend_sampler_v2.py b/test/python/primitives/test_backend_sampler_v2.py new file mode 100644 index 000000000000..ba9bfb6af7b4 --- /dev/null +++ b/test/python/primitives/test_backend_sampler_v2.py @@ -0,0 +1,655 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for Backend Sampler V2.""" + +from __future__ import annotations + +import unittest +from dataclasses import astuple +from test import QiskitTestCase, combine + +import numpy as np +from ddt import ddt +from numpy.typing import NDArray + +from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister +from qiskit.circuit import Parameter +from qiskit.circuit.library import RealAmplitudes, UnitaryGate +from qiskit.primitives import PrimitiveResult, PubResult, StatevectorSampler +from qiskit.primitives.backend_sampler_v2 import BackendSamplerV2 +from qiskit.primitives.containers import BitArray +from qiskit.primitives.containers.data_bin import DataBin +from qiskit.primitives.containers.sampler_pub import SamplerPub +from qiskit.providers import JobStatus +from qiskit.providers.backend_compat import BackendV2Converter +from qiskit.providers.basic_provider import BasicSimulator +from qiskit.providers.fake_provider import Fake7QPulseV1 +from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager + +BACKENDS = [BasicSimulator(), Fake7QPulseV1(), BackendV2Converter(Fake7QPulseV1())] + + +@ddt +class TestBackendSamplerV2(QiskitTestCase): + """Test for BackendSamplerV2""" + + def setUp(self): + super().setUp() + self._shots = 10000 + self._seed = 123 + self._options = {"default_shots": self._shots, "seed_simulator": self._seed} + + self._cases = [] + hadamard = QuantumCircuit(1, 1, name="Hadamard") + hadamard.h(0) + hadamard.measure(0, 0) + self._cases.append((hadamard, None, {0: 5000, 1: 5000})) # case 0 + + bell = QuantumCircuit(2, name="Bell") + bell.h(0) + bell.cx(0, 1) + bell.measure_all() + self._cases.append((bell, None, {0: 5000, 3: 5000})) # case 1 + + pqc = RealAmplitudes(num_qubits=2, reps=2) + pqc.measure_all() + self._cases.append((pqc, [0] * 6, {0: 10000})) # case 2 + self._cases.append((pqc, [1] * 6, {0: 168, 1: 3389, 2: 470, 3: 5973})) # case 3 + self._cases.append((pqc, [0, 1, 1, 2, 3, 5], {0: 1339, 1: 3534, 2: 912, 3: 4215})) # case 4 + self._cases.append((pqc, [1, 2, 3, 4, 5, 6], {0: 634, 1: 291, 2: 6039, 3: 3036})) # case 5 + + pqc2 = RealAmplitudes(num_qubits=2, reps=3) + pqc2.measure_all() + self._cases.append( + (pqc2, [0, 1, 2, 3, 4, 5, 6, 7], {0: 1898, 1: 6864, 2: 928, 3: 311}) + ) # case 6 + + def _assert_allclose(self, bitarray: BitArray, target: NDArray | BitArray, rtol=1e-1, atol=5e2): + self.assertEqual(bitarray.shape, target.shape) + for idx in np.ndindex(bitarray.shape): + int_counts = bitarray.get_int_counts(idx) + target_counts = ( + target.get_int_counts(idx) if isinstance(target, BitArray) else target[idx] + ) + max_key = max(max(int_counts.keys()), max(target_counts.keys())) + ary = np.array([int_counts.get(i, 0) for i in range(max_key + 1)]) + tgt = np.array([target_counts.get(i, 0) for i in range(max_key + 1)]) + np.testing.assert_allclose(ary, tgt, rtol=rtol, atol=atol, err_msg=f"index: {idx}") + + @combine(backend=BACKENDS) + def test_sampler_run(self, backend): + """Test run().""" + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + + with self.subTest("single"): + bell, _, target = self._cases[1] + bell = pm.run(bell) + sampler = BackendSamplerV2(backend=backend, options=self._options) + job = sampler.run([bell], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array(target)) + + with self.subTest("single with param"): + pqc, param_vals, target = self._cases[2] + sampler = BackendSamplerV2(backend=backend, options=self._options) + pqc = pm.run(pqc) + params = (param.name for param in pqc.parameters) + job = sampler.run([(pqc, {params: param_vals})], shots=self._shots) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array(target)) + + with self.subTest("multiple"): + pqc, param_vals, target = self._cases[2] + sampler = BackendSamplerV2(backend=backend, options=self._options) + pqc = pm.run(pqc) + params = (param.name for param in pqc.parameters) + job = sampler.run( + [(pqc, {params: [param_vals, param_vals, param_vals]})], shots=self._shots + ) + result = job.result() + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), 1) + self.assertIsInstance(result[0], PubResult) + self.assertIsInstance(result[0].data, DataBin) + self.assertIsInstance(result[0].data.meas, BitArray) + self._assert_allclose(result[0].data.meas, np.array([target, target, target])) + + @combine(backend=BACKENDS) + def test_sampler_run_multiple_times(self, backend): + """Test run() returns the same results if the same input is given.""" + bell, _, _ = self._cases[1] + sampler = BackendSamplerV2(backend=backend, options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + bell = pm.run(bell) + result1 = sampler.run([bell], shots=self._shots).result() + meas1 = result1[0].data.meas + result2 = sampler.run([bell], shots=self._shots).result() + meas2 = result2[0].data.meas + self._assert_allclose(meas1, meas2, rtol=0) + + @combine(backend=BACKENDS) + def test_sample_run_multiple_circuits(self, backend): + """Test run() with multiple circuits.""" + bell, _, target = self._cases[1] + sampler = BackendSamplerV2(backend=backend, options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + bell = pm.run(bell) + result = sampler.run([bell, bell, bell], shots=self._shots).result() + self.assertEqual(len(result), 3) + self._assert_allclose(result[0].data.meas, np.array(target)) + self._assert_allclose(result[1].data.meas, np.array(target)) + self._assert_allclose(result[2].data.meas, np.array(target)) + + @combine(backend=BACKENDS) + def test_sampler_run_with_parameterized_circuits(self, backend): + """Test run() with parameterized circuits.""" + pqc1, param1, target1 = self._cases[4] + pqc2, param2, target2 = self._cases[5] + pqc3, param3, target3 = self._cases[6] + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + pqc1, pqc2, pqc3 = pm.run([pqc1, pqc2, pqc3]) + + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run( + [(pqc1, param1), (pqc2, param2), (pqc3, param3)], shots=self._shots + ).result() + self.assertEqual(len(result), 3) + self._assert_allclose(result[0].data.meas, np.array(target1)) + self._assert_allclose(result[1].data.meas, np.array(target2)) + self._assert_allclose(result[2].data.meas, np.array(target3)) + + @combine(backend=BACKENDS) + def test_run_1qubit(self, backend): + """test for 1-qubit cases""" + qc = QuantumCircuit(1) + qc.measure_all() + qc2 = QuantumCircuit(1) + qc2.x(0) + qc2.measure_all() + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc, qc2 = pm.run([qc, qc2]) + + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([qc, qc2], shots=self._shots).result() + self.assertEqual(len(result), 2) + for i in range(2): + self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) + + @combine(backend=BACKENDS) + def test_run_2qubit(self, backend): + """test for 2-qubit cases""" + qc0 = QuantumCircuit(2) + qc0.measure_all() + qc1 = QuantumCircuit(2) + qc1.x(0) + qc1.measure_all() + qc2 = QuantumCircuit(2) + qc2.x(1) + qc2.measure_all() + qc3 = QuantumCircuit(2) + qc3.x([0, 1]) + qc3.measure_all() + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc0, qc1, qc2, qc3 = pm.run([qc0, qc1, qc2, qc3]) + + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([qc0, qc1, qc2, qc3], shots=self._shots).result() + self.assertEqual(len(result), 4) + for i in range(4): + self._assert_allclose(result[i].data.meas, np.array({i: self._shots})) + + @combine(backend=BACKENDS) + def test_run_single_circuit(self, backend): + """Test for single circuit case.""" + sampler = BackendSamplerV2(backend=backend, options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + + with self.subTest("No parameter"): + circuit, _, target = self._cases[1] + circuit = pm.run(circuit) + param_target = [ + (None, np.array(target)), + ({}, np.array(target)), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target) + + with self.subTest("One parameter"): + circuit = QuantumCircuit(1, 1, name="X gate") + param = Parameter("x") + circuit.ry(param, 0) + circuit.measure(0, 0) + circuit = pm.run(circuit) + param_target = [ + ({"x": np.pi}, np.array({1: self._shots})), + ({param: np.pi}, np.array({1: self._shots})), + ({"x": np.array(np.pi)}, np.array({1: self._shots})), + ({param: np.array(np.pi)}, np.array({1: self._shots})), + ({"x": [np.pi]}, np.array({1: self._shots})), + ({param: [np.pi]}, np.array({1: self._shots})), + ({"x": np.array([np.pi])}, np.array({1: self._shots})), + ({param: np.array([np.pi])}, np.array({1: self._shots})), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.c, target) + + with self.subTest("More than one parameter"): + circuit, param, target = self._cases[3] + circuit = pm.run(circuit) + param_target = [ + (param, np.array(target)), + (tuple(param), np.array(target)), + (np.array(param), np.array(target)), + ((param,), np.array([target])), + ([param], np.array([target])), + (np.array([param]), np.array([target])), + ] + for param, target in param_target: + with self.subTest(f"{circuit.name} w/ {param}"): + result = sampler.run([(circuit, param)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target) + + @combine(backend=BACKENDS) + def test_run_reverse_meas_order(self, backend): + """test for sampler with reverse measurement order""" + x = Parameter("x") + y = Parameter("y") + + qc = QuantumCircuit(3, 3) + qc.rx(x, 0) + qc.rx(y, 1) + qc.x(2) + qc.measure(0, 2) + qc.measure(1, 1) + qc.measure(2, 0) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc = pm.run(qc) + + sampler = BackendSamplerV2(backend=backend) + sampler.options.seed_simulator = self._seed + result = sampler.run([(qc, [0, 0]), (qc, [np.pi / 2, 0])], shots=self._shots).result() + self.assertEqual(len(result), 2) + + # qc({x: 0, y: 0}) + self._assert_allclose(result[0].data.c, np.array({1: self._shots})) + + # qc({x: pi/2, y: 0}) + self._assert_allclose(result[1].data.c, np.array({1: self._shots / 2, 5: self._shots / 2})) + + @combine(backend=BACKENDS) + def test_run_errors(self, backend): + """Test for errors with run method""" + qc1 = QuantumCircuit(1) + qc1.measure_all() + qc2 = RealAmplitudes(num_qubits=1, reps=1) + qc2.measure_all() + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc1, qc2 = pm.run([qc1, qc2]) + + sampler = BackendSamplerV2(backend=backend) + with self.subTest("set parameter values to a non-parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc1, [1e2])]).result() + with self.subTest("missing all parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([qc2]).result() + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [])]).result() + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, None)]).result() + with self.subTest("missing some parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [1e2])]).result() + with self.subTest("too many parameter values for a parameterized circuit"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc2, [1e2] * 100)]).result() + with self.subTest("negative shots, run arg"): + with self.assertRaises(ValueError): + _ = sampler.run([qc1], shots=-1).result() + with self.subTest("negative shots, pub-like"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc1, None, -1)]).result() + with self.subTest("negative shots, pub"): + with self.assertRaises(ValueError): + _ = sampler.run([SamplerPub(qc1, shots=-1)]).result() + with self.subTest("zero shots, run arg"): + with self.assertRaises(ValueError): + _ = sampler.run([qc1], shots=0).result() + with self.subTest("zero shots, pub-like"): + with self.assertRaises(ValueError): + _ = sampler.run([(qc1, None, 0)]).result() + with self.subTest("zero shots, pub"): + with self.assertRaises(ValueError): + _ = sampler.run([SamplerPub(qc1, shots=0)]).result() + + @combine(backend=BACKENDS) + def test_run_empty_parameter(self, backend): + """Test for empty parameter""" + n = 5 + qc = QuantumCircuit(n, n - 1) + qc.measure(range(n - 1), range(n - 1)) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc = pm.run(qc) + sampler = BackendSamplerV2(backend=backend, options=self._options) + with self.subTest("one circuit"): + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.c, np.array({0: self._shots})) + + with self.subTest("two circuits"): + result = sampler.run([qc, qc], shots=self._shots).result() + self.assertEqual(len(result), 2) + for i in range(2): + self._assert_allclose(result[i].data.c, np.array({0: self._shots})) + + @combine(backend=BACKENDS) + def test_run_numpy_params(self, backend): + """Test for numpy array as parameter values""" + qc = RealAmplitudes(num_qubits=2, reps=2) + qc.measure_all() + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc = pm.run(qc) + k = 5 + params_array = np.linspace(0, 1, k * qc.num_parameters).reshape((k, qc.num_parameters)) + params_list = params_array.tolist() + sampler = StatevectorSampler(seed=self._seed) + target = sampler.run([(qc, params_list)], shots=self._shots).result() + + with self.subTest("ndarray"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([(qc, params_array)], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, target[0].data.meas) + + with self.subTest("split a list"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run( + [(qc, params) for params in params_list], shots=self._shots + ).result() + self.assertEqual(len(result), k) + for i in range(k): + self._assert_allclose( + result[i].data.meas, np.array(target[0].data.meas.get_int_counts(i)) + ) + + @combine(backend=BACKENDS) + def test_run_with_shots_option(self, backend): + """test with shots option.""" + bell, _, _ = self._cases[1] + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + bell = pm.run(bell) + shots = 100 + + with self.subTest("run arg"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([bell], shots=shots).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + + with self.subTest("default shots"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + default_shots = sampler.options.default_shots + result = sampler.run([bell]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, default_shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots) + + with self.subTest("setting default shots"): + default_shots = 100 + sampler = BackendSamplerV2(backend=backend, options=self._options) + sampler.options.default_shots = default_shots + self.assertEqual(sampler.options.default_shots, default_shots) + result = sampler.run([bell]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, default_shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), default_shots) + + with self.subTest("pub-like"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([(bell, None, shots)]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + + with self.subTest("pub"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([SamplerPub(bell, shots=shots)]).result() + self.assertEqual(len(result), 1) + self.assertEqual(result[0].data.meas.num_shots, shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots) + + with self.subTest("multiple pubs"): + sampler = BackendSamplerV2(backend=backend, options=self._options) + shots1 = 100 + shots2 = 200 + result = sampler.run( + [ + SamplerPub(bell, shots=shots1), + SamplerPub(bell, shots=shots2), + ], + shots=self._shots, + ).result() + self.assertEqual(len(result), 2) + self.assertEqual(result[0].data.meas.num_shots, shots1) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), shots1) + self.assertEqual(result[1].data.meas.num_shots, shots2) + self.assertEqual(sum(result[1].data.meas.get_counts().values()), shots2) + + @combine(backend=BACKENDS) + def test_run_shots_result_size(self, backend): + """test with shots option to validate the result size""" + n = 7 # should be less than or equal to the number of qubits of backend + qc = QuantumCircuit(n) + qc.h(range(n)) + qc.measure_all() + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc = pm.run(qc) + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + self.assertLessEqual(result[0].data.meas.num_shots, self._shots) + self.assertEqual(sum(result[0].data.meas.get_counts().values()), self._shots) + + @combine(backend=BACKENDS) + def test_primitive_job_status_done(self, backend): + """test primitive job's status""" + bell, _, _ = self._cases[1] + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + bell = pm.run(bell) + sampler = BackendSamplerV2(backend=backend, options=self._options) + job = sampler.run([bell], shots=self._shots) + _ = job.result() + self.assertEqual(job.status(), JobStatus.DONE) + + @combine(backend=BACKENDS) + def test_circuit_with_unitary(self, backend): + """Test for circuit with unitary gate.""" + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + + with self.subTest("identity"): + gate = UnitaryGate(np.eye(2)) + + circuit = QuantumCircuit(1) + circuit.append(gate, [0]) + circuit.measure_all() + circuit = pm.run(circuit) + + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([circuit], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, np.array({0: self._shots})) + + with self.subTest("X"): + gate = UnitaryGate([[0, 1], [1, 0]]) + + circuit = QuantumCircuit(1) + circuit.append(gate, [0]) + circuit.measure_all() + circuit = pm.run(circuit) + + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([circuit], shots=self._shots).result() + self.assertEqual(len(result), 1) + self._assert_allclose(result[0].data.meas, np.array({1: self._shots})) + + @combine(backend=BACKENDS) + def test_circuit_with_multiple_cregs(self, backend): + """Test for circuit with multiple classical registers.""" + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + cases = [] + + # case 1 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) + qc = pm.run(qc) + target = {"a": {0: 5000, 1: 5000}, "b": {0: 5000, 2: 5000}, "c": {0: 5000, 6: 5000}} + cases.append(("use all cregs", qc, target)) + + # case 2 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(5, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2, 2], [0, 2, 4, 5]) + qc = pm.run(qc) + target = { + "a": {0: 5000, 1: 5000}, + "b": {0: 2500, 2: 2500, 24: 2500, 26: 2500}, + "c": {0: 10000}, + } + cases.append(("use only a and b", qc, target)) + + # case 3 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure(1, 5) + qc = pm.run(qc) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} + cases.append(("use only c", qc, target)) + + # case 4 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc.measure([0, 1, 2], [5, 5, 5]) + qc = pm.run(qc) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 5000, 4: 5000}} + cases.append(("use only c multiple qubits", qc, target)) + + # case 5 + a = ClassicalRegister(1, "a") + b = ClassicalRegister(2, "b") + c = ClassicalRegister(3, "c") + + qc = QuantumCircuit(QuantumRegister(3), a, b, c) + qc.h(range(3)) + qc = pm.run(qc) + target = {"a": {0: 10000}, "b": {0: 10000}, "c": {0: 10000}} + cases.append(("no measure", qc, target)) + + for title, qc, target in cases: + with self.subTest(title): + sampler = BackendSamplerV2(backend=backend, options=self._options) + result = sampler.run([qc], shots=self._shots).result() + self.assertEqual(len(result), 1) + data = result[0].data + self.assertEqual(len(astuple(data)), 3) + for creg in qc.cregs: + self.assertTrue(hasattr(data, creg.name)) + self._assert_allclose(getattr(data, creg.name), np.array(target[creg.name])) + + @combine(backend=BACKENDS) + def test_circuit_with_aliased_cregs(self, backend): + """Test for circuit with aliased classical registers.""" + q = QuantumRegister(3, "q") + c1 = ClassicalRegister(1, "c1") + c2 = ClassicalRegister(1, "c2") + + qc = QuantumCircuit(q, c1, c2) + qc.ry(np.pi / 4, 2) + qc.cx(2, 1) + qc.cx(0, 1) + qc.h(0) + qc.measure(0, c1) + qc.measure(1, c2) + qc.z(2).c_if(c1, 1) + qc.x(2).c_if(c2, 1) + qc2 = QuantumCircuit(5, 5) + qc2.compose(qc, [0, 2, 3], [2, 4], inplace=True) + cregs = [creg.name for creg in qc2.cregs] + target = { + cregs[0]: {0: 4255, 4: 4297, 16: 720, 20: 726}, + cregs[1]: {0: 5000, 1: 5000}, + cregs[2]: {0: 8500, 1: 1500}, + } + + sampler = BackendSamplerV2(backend=backend, options=self._options) + pm = generate_preset_pass_manager(optimization_level=0, backend=backend) + qc2 = pm.run(qc2) + result = sampler.run([qc2], shots=self._shots).result() + self.assertEqual(len(result), 1) + data = result[0].data + self.assertEqual(len(astuple(data)), 3) + for creg_name in target: + self.assertTrue(hasattr(data, creg_name)) + self._assert_allclose(getattr(data, creg_name), np.array(target[creg_name])) + + @combine(backend=BACKENDS) + def test_no_cregs(self, backend): + """Test that the sampler works when there are no classical register in the circuit.""" + qc = QuantumCircuit(2) + sampler = BackendSamplerV2(backend=backend, options=self._options) + with self.assertWarns(UserWarning): + result = sampler.run([qc]).result() + + self.assertEqual(len(result), 1) + self.assertEqual(len(result[0].data), 0) + + +if __name__ == "__main__": + unittest.main()