From 385e10e9db69e660ba7f3be5295d365a245d5f94 Mon Sep 17 00:00:00 2001 From: Kevin Tian Date: Thu, 25 Jan 2024 12:04:01 -0500 Subject: [PATCH] Add transpiler tests (#1323) * Add transpiler tests * remove qiskittestcase --- test/unit/transpiler/__init__.py | 11 + test/unit/transpiler/passes/__init__.py | 11 + test/unit/transpiler/passes/basis/__init__.py | 11 + .../passes/basis/test_convert_id_to_delay.py | 99 + .../transpiler/passes/scheduling/__init__.py | 11 + .../scheduling/control_flow_test_case.py | 36 + .../scheduling/test_dynamical_decoupling.py | 1072 ++++++++++ .../passes/scheduling/test_scheduler.py | 1860 +++++++++++++++++ .../passes/scheduling/test_utils.py | 65 + 9 files changed, 3176 insertions(+) create mode 100644 test/unit/transpiler/__init__.py create mode 100644 test/unit/transpiler/passes/__init__.py create mode 100644 test/unit/transpiler/passes/basis/__init__.py create mode 100644 test/unit/transpiler/passes/basis/test_convert_id_to_delay.py create mode 100644 test/unit/transpiler/passes/scheduling/__init__.py create mode 100644 test/unit/transpiler/passes/scheduling/control_flow_test_case.py create mode 100644 test/unit/transpiler/passes/scheduling/test_dynamical_decoupling.py create mode 100644 test/unit/transpiler/passes/scheduling/test_scheduler.py create mode 100644 test/unit/transpiler/passes/scheduling/test_utils.py diff --git a/test/unit/transpiler/__init__.py b/test/unit/transpiler/__init__.py new file mode 100644 index 000000000..139b265e1 --- /dev/null +++ b/test/unit/transpiler/__init__.py @@ -0,0 +1,11 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. diff --git a/test/unit/transpiler/passes/__init__.py b/test/unit/transpiler/passes/__init__.py new file mode 100644 index 000000000..139b265e1 --- /dev/null +++ b/test/unit/transpiler/passes/__init__.py @@ -0,0 +1,11 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. diff --git a/test/unit/transpiler/passes/basis/__init__.py b/test/unit/transpiler/passes/basis/__init__.py new file mode 100644 index 000000000..139b265e1 --- /dev/null +++ b/test/unit/transpiler/passes/basis/__init__.py @@ -0,0 +1,11 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. diff --git a/test/unit/transpiler/passes/basis/test_convert_id_to_delay.py b/test/unit/transpiler/passes/basis/test_convert_id_to_delay.py new file mode 100644 index 000000000..617b86f1e --- /dev/null +++ b/test/unit/transpiler/passes/basis/test_convert_id_to_delay.py @@ -0,0 +1,99 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test the conversion of Id gate operations to a delay.""" + +from qiskit.circuit import QuantumCircuit +from qiskit.transpiler.passmanager import PassManager + +from qiskit_ibm_runtime.transpiler.passes.basis.convert_id_to_delay import ( + ConvertIdToDelay, +) + +from qiskit_ibm_runtime.transpiler.passes.scheduling.utils import ( + DynamicCircuitInstructionDurations, +) + +from .....ibm_test_case import IBMTestCase + +# pylint: disable=invalid-name + + +class TestConvertIdToDelay(IBMTestCase): + """Tests the ConvertIdToDelay pass""" + + def setUp(self): + """Setup.""" + super().setUp() + + self.durations = DynamicCircuitInstructionDurations([("sx", None, 160), ("x", None, 200)]) + + def test_id_gate(self): + """Test if Id gate is converted a delay.""" + qc = QuantumCircuit(1, 0) + qc.id(0) + + pm = PassManager([ConvertIdToDelay(self.durations)]) + transformed = pm.run(qc) + + expected = QuantumCircuit(1, 0) + expected.delay(160, 0) + + self.assertEqual(expected, transformed) + + def test_id_gate_unit(self): + """Test if Id gate is converted a delay with correct units.""" + qc = QuantumCircuit(1, 0) + qc.id(0) + + pm = PassManager([ConvertIdToDelay(self.durations, "x")]) + transformed = pm.run(qc) + + expected = QuantumCircuit(1, 0) + expected.delay(200, 0) + + self.assertEqual(expected, transformed) + + def test_c_if_id_gate(self): + """Test if c_if Id gate is converted a c_if delay.""" + qc = QuantumCircuit(1, 1) + + with qc.if_test((0, 1)): # pylint: disable=not-context-manager + qc.id(0) + + pm = PassManager([ConvertIdToDelay(self.durations)]) + transformed = pm.run(qc) + + expected = QuantumCircuit(1, 1) + with expected.if_test((0, 1)): # pylint: disable=not-context-manager + expected.delay(160, 0) + + self.assertEqual(expected, transformed) + + def test_if_test_id_gate(self): + """Test if if_test Id gate is converted a if_test delay.""" + qc = QuantumCircuit(1, 1) + with qc.if_test((0, 1)) as else_: # pylint: disable=not-context-manager + qc.id(0) + with else_: # pylint: disable=not-context-manager + qc.id(0) + + pm = PassManager([ConvertIdToDelay(self.durations)]) + transformed = pm.run(qc) + + expected = QuantumCircuit(1, 1) + with expected.if_test((0, 1)) as else_: # pylint: disable=not-context-manager + expected.delay(160, 0) + with else_: + expected.delay(160, 0) + + self.assertEqual(expected, transformed) diff --git a/test/unit/transpiler/passes/scheduling/__init__.py b/test/unit/transpiler/passes/scheduling/__init__.py new file mode 100644 index 000000000..139b265e1 --- /dev/null +++ b/test/unit/transpiler/passes/scheduling/__init__.py @@ -0,0 +1,11 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. diff --git a/test/unit/transpiler/passes/scheduling/control_flow_test_case.py b/test/unit/transpiler/passes/scheduling/control_flow_test_case.py new file mode 100644 index 000000000..b64822883 --- /dev/null +++ b/test/unit/transpiler/passes/scheduling/control_flow_test_case.py @@ -0,0 +1,36 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022, 2023. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Enhanced test case for control flow circuits.""" + +from typing import Any, Optional + +from qiskit import QuantumCircuit +from qiskit.test._canonical import canonicalize_control_flow + +from .....ibm_test_case import IBMTestCase + + +class ControlFlowTestCase(IBMTestCase): + """Test case that enforces control flow canonicalization of quantum circuits.""" + + def assertEqual( # pylint: disable=arguments-differ + self, first: Any, second: Any, msg: Optional[str] = None + ) -> None: + """Modify assertEqual to canonicalize the quantum circuit.""" + if isinstance(first, QuantumCircuit): + first = canonicalize_control_flow(first) + + if isinstance(second, QuantumCircuit): + second = canonicalize_control_flow(second) + + super().assertEqual(first, second, msg=msg) # pylint: disable=no-value-for-parameter diff --git a/test/unit/transpiler/passes/scheduling/test_dynamical_decoupling.py b/test/unit/transpiler/passes/scheduling/test_dynamical_decoupling.py new file mode 100644 index 000000000..145420478 --- /dev/null +++ b/test/unit/transpiler/passes/scheduling/test_dynamical_decoupling.py @@ -0,0 +1,1072 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test dynamical decoupling insertion pass.""" + +import numpy as np +from numpy import pi + +from ddt import ddt, data +from qiskit import pulse +from qiskit.circuit import QuantumCircuit, Delay +from qiskit.circuit.library import XGate, YGate, RXGate, UGate +from qiskit.quantum_info import Operator +from qiskit.transpiler.passmanager import PassManager +from qiskit.transpiler.exceptions import TranspilerError +from qiskit.transpiler.coupling import CouplingMap +from qiskit.converters import circuit_to_dag + +from qiskit_ibm_runtime.transpiler.passes.scheduling.dynamical_decoupling import ( + PadDynamicalDecoupling, +) +from qiskit_ibm_runtime.transpiler.passes.scheduling.scheduler import ( + ASAPScheduleAnalysis, +) +from qiskit_ibm_runtime.transpiler.passes.scheduling.utils import ( + DynamicCircuitInstructionDurations, +) + +from .control_flow_test_case import ControlFlowTestCase + +# pylint: disable=invalid-name,not-context-manager + + +@ddt +class TestPadDynamicalDecoupling(ControlFlowTestCase): + """Tests PadDynamicalDecoupling pass.""" + + def setUp(self): + """Circuits to test dynamical decoupling on.""" + super().setUp() + + self.ghz4 = QuantumCircuit(4) + self.ghz4.h(0) + self.ghz4.cx(0, 1) + self.ghz4.cx(1, 2) + self.ghz4.cx(2, 3) + + self.midmeas = QuantumCircuit(3, 1) + self.midmeas.cx(0, 1) + self.midmeas.cx(1, 2) + self.midmeas.u(pi, 0, pi, 0) + self.midmeas.measure(2, 0) + self.midmeas.cx(1, 2) + self.midmeas.cx(0, 1) + + self.durations = DynamicCircuitInstructionDurations( + [ + ("h", 0, 50), + ("cx", [0, 1], 700), + ("cx", [1, 2], 200), + ("cx", [2, 3], 300), + ("x", None, 50), + ("y", None, 50), + ("u", None, 100), + ("rx", None, 100), + ("measure", None, 840), + ("reset", None, 1340), + ] + ) + + self.coupling_map = CouplingMap([[0, 1], [1, 2], [2, 3]]) + + def test_insert_dd_ghz(self): + """Test DD gates are inserted in correct spots.""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[1.0], + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + expected = expected.compose(Delay(100), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(200), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(100), [0]) + + expected = expected.compose(Delay(50), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(100), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(50), [1]) + + self.assertEqual(ghz4_dd, expected) + + def test_insert_dd_ghz_one_qubit(self): + """Test DD gates are inserted on only one qubit.""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + qubits=[0], + pulse_alignment=1, + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4.measure_all(inplace=False)) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + expected = expected.compose(Delay(100), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(200), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(100), [0]) + + expected = expected.compose(Delay(300), [1]) + + expected.measure_all() + + self.assertEqual(ghz4_dd, expected) + + def test_insert_dd_ghz_everywhere(self): + """Test DD gates even on initial idle spots.""" + dd_sequence = [YGate(), YGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + skip_reset_qubits=False, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + + expected = expected.compose(Delay(100), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(200), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(100), [0]) + + expected = expected.compose(Delay(50), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(100), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(50), [1]) + + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + self.assertEqual(ghz4_dd, expected) + + def test_insert_dd_ghz_xy4(self): + """Test XY4 sequence of DD gates.""" + dd_sequence = [XGate(), YGate(), XGate(), YGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[1.0], + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + expected = expected.compose(Delay(37), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(75), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(76), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(75), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(37), [0]) + + expected = expected.compose(Delay(12), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(25), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(26), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(25), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(12), [1]) + + self.assertEqual(ghz4_dd, expected) + + def test_insert_midmeas_hahn(self): + """Test a single X gate as Hahn echo can absorb in the upstream circuit.""" + dd_sequence = [RXGate(pi / 4)] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + schedule_idle_qubits=True, + ), + ] + ) + + midmeas_dd = pm.run(self.midmeas) + + combined_u = UGate(3 * pi / 4, -pi / 2, pi / 2) + + expected = QuantumCircuit(3, 1) + expected.cx(0, 1) + expected.compose(combined_u, [0], inplace=True) + expected.delay(600, 0) + expected.rx(pi / 4, 0) + expected.delay(600, 0) + expected.delay(700, 2) + expected.cx(1, 2) + expected.delay(1000, 1) + expected.measure(2, 0) + expected.cx(1, 2) + expected.cx(0, 1) + expected.delay(700, 2) + + self.assertEqual(midmeas_dd, expected) + # check the absorption into U was done correctly + self.assertTrue( + Operator(XGate()).equiv( + Operator(UGate(3 * pi / 4, -pi / 2, pi / 2)) & Operator(RXGate(pi / 4)) + ) + ) + + def test_insert_ghz_uhrig(self): + """Test custom spacing (following Uhrig DD [1]). + [1] Uhrig, G. "Keeping a quantum bit alive by optimized π-pulse sequences." + Physical Review Letters 98.10 (2007): 100504.""" + n = 8 + dd_sequence = [XGate()] * n + + # uhrig specifies the location of the k'th pulse + def uhrig(k): + return np.sin(np.pi * (k + 1) / (2 * n + 2)) ** 2 + + # convert that to spacing between pulses (whatever finite duration pulses have) + spacing = [] + for k in range(n): + spacing.append(uhrig(k) - sum(spacing)) + spacing.append(1 - sum(spacing)) + + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + qubits=[0], + spacings=spacing, + sequence_min_length_ratios=[0.0], + pulse_alignment=1, + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + expected = expected.compose(Delay(3), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(8), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(13), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(16), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(20), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(16), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(13), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(8), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(3), [0]) + + expected = expected.compose(Delay(300), [1]) + + self.assertEqual(ghz4_dd, expected) + + def test_asymmetric_xy4_in_t2(self): + """Test insertion of XY4 sequence with unbalanced spacing.""" + dd_sequence = [XGate(), YGate()] * 2 + spacing = [0] + [1 / 4] * 4 + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + spacings=spacing, + schedule_idle_qubits=True, + ), + ] + ) + + t2 = QuantumCircuit(1) + t2.h(0) + t2.delay(2000, 0) + t2.h(0) + + expected = QuantumCircuit(1) + expected.h(0) + expected.x(0) + expected.delay(450, 0) + expected.y(0) + expected.delay(450, 0) + expected.x(0) + expected.delay(450, 0) + expected.y(0) + expected.delay(450, 0) + expected.h(0) + expected.global_phase = pi + + t2_dd = pm.run(t2) + + self.assertEqual(t2_dd, expected) + # check global phase is correct + self.assertEqual(Operator(t2), Operator(expected)) + + def test_dd_after_reset(self): + """Test skip_reset_qubits option works.""" + dd_sequence = [XGate(), XGate()] + spacing = [0.1, 0.9] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + spacings=spacing, + skip_reset_qubits=True, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + schedule_idle_qubits=True, + ), + ] + ) + + t2 = QuantumCircuit(1) + t2.reset(0) + t2.delay(1000) + t2.h(0) + t2.delay(2000, 0) + t2.h(0) + + expected = QuantumCircuit(1) + expected.reset(0) + expected.barrier() + expected.delay(90) + expected.x(0) + expected.delay(810) + expected.x(0) + expected.h(0) + expected.delay(190, 0) + expected.x(0) + expected.delay(1710, 0) + expected.x(0) + expected.h(0) + + t2_dd = pm.run(t2) + + self.assertEqual(t2_dd, expected) + + def test_insert_dd_bad_sequence(self): + """Test DD raises when non-identity sequence is inserted.""" + dd_sequence = [XGate(), YGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling(self.durations, dd_sequence, schedule_idle_qubits=True), + ] + ) + + with self.assertRaises(TranspilerError): + pm.run(self.ghz4) + + @data(0.5, 1.5) + def test_dd_with_calibrations_with_parameters(self, param_value): + """Check that calibrations in a circuit with parameters work fine.""" + + circ = QuantumCircuit(2) + circ.x(0) + circ.cx(0, 1) + circ.rx(param_value, 1) + + rx_duration = int(param_value * 1000) + + with pulse.build() as rx: + pulse.play( + pulse.Gaussian(rx_duration, 0.1, rx_duration // 4), + pulse.DriveChannel(1), + ) + + circ.add_calibration("rx", (1,), rx, params=[param_value]) + + durations = DynamicCircuitInstructionDurations([("x", None, 100), ("cx", None, 300)]) + + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(durations), + PadDynamicalDecoupling(durations, dd_sequence, schedule_idle_qubits=True), + ] + ) + dd_circuit = pm.run(circ) + + for instruction in dd_circuit.data: + op = instruction.operation + if isinstance(op, RXGate): + self.assertEqual(op.duration, rx_duration) + + def test_insert_dd_ghz_xy4_with_alignment(self): + """Test DD with pulse alignment constraints.""" + dd_sequence = [XGate(), YGate(), XGate(), YGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=10, + extra_slack_distribution="edges", + sequence_min_length_ratios=[1.0], + schedule_idle_qubits=True, + ), + ] + ) + + ghz4_dd = pm.run(self.ghz4) + + expected = self.ghz4.copy() + expected = expected.compose(Delay(50), [1], front=True) + expected = expected.compose(Delay(750), [2], front=True) + expected = expected.compose(Delay(950), [3], front=True) + + expected = expected.compose(Delay(40), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(70), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(70), [0]) + expected = expected.compose(XGate(), [0]) + expected = expected.compose(Delay(70), [0]) + expected = expected.compose(YGate(), [0]) + expected = expected.compose(Delay(50), [0]) + + expected = expected.compose(Delay(20), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(20), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(20), [1]) + expected = expected.compose(XGate(), [1]) + expected = expected.compose(Delay(20), [1]) + expected = expected.compose(YGate(), [1]) + expected = expected.compose(Delay(20), [1]) + + self.assertEqual(ghz4_dd, expected) + + def test_dd_can_sequentially_called(self): + """Test if sequentially called DD pass can output the same circuit. + This test verifies: + - if global phase is properly propagated from the previous padding node. + - if node_start_time property is properly updated for new dag circuit. + """ + dd_sequence = [XGate(), YGate(), XGate(), YGate()] + + pm1 = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, dd_sequence, qubits=[0], schedule_idle_qubits=True + ), + PadDynamicalDecoupling( + self.durations, dd_sequence, qubits=[1], schedule_idle_qubits=True + ), + ] + ) + circ1 = pm1.run(self.ghz4) + + pm2 = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + qubits=[0, 1], + schedule_idle_qubits=True, + ), + ] + ) + circ2 = pm2.run(self.ghz4) + + self.assertEqual(circ1, circ2) + + def test_back_to_back_if_test(self): + """Test DD with if_test circuit back to back.""" + + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + schedule_idle_qubits=True, + ), + ] + ) + + qc = QuantumCircuit(3, 1) + qc.delay(800, 1) + with qc.if_test((0, True)): + qc.x(1) + with qc.if_test((0, True)): + qc.x(2) + qc.delay(1000, 2) + qc.x(1) + + qc_dd = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(800, 0) + expected.delay(800, 1) + expected.delay(800, 2) + expected.barrier() + with expected.if_test((0, True)): + expected.delay(50, 0) + expected.x(1) + expected.delay(50, 2) + with expected.if_test((0, True)): + expected.delay(50, 0) + expected.delay(50, 1) + expected.x(2) + expected.barrier() + expected.delay(1000, 0) + expected.x(1) + expected.delay(212, 1) + expected.x(1) + expected.delay(426, 1) + expected.x(1) + expected.delay(212, 1) + expected.delay(225, 2) + expected.x(2) + expected.delay(450, 2) + expected.x(2) + expected.delay(225, 2) + + self.assertEqual(expected, qc_dd) + + def test_dd_if_test(self): + """Test DD with if_test circuit.""" + + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + schedule_idle_qubits=True, + ), + ] + ) + + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.x(2) + qc.delay(1000, 1) + with qc.if_test((0, True)): + qc.x(1) + qc.delay(8000, 1) + with qc.if_test((0, True)): + qc.x(2) + qc.delay(1000, 2) + qc.x(0) + qc.x(2) + + qc_dd = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 1) + expected.x(2) + expected.measure(0, 0) + expected.delay(212, 2) + expected.x(2) + expected.delay(426, 2) + expected.x(2) + expected.delay(212, 2) + expected.barrier() + with expected.if_test((0, True)): + expected.delay(50, 0) + expected.x(1) + expected.delay(50, 2) + with expected.if_test((0, True)): + expected.delay(50, 0) + expected.delay(50, 1) + expected.x(2) + expected.barrier() + expected.x(0) + expected.delay(1962, 0) + expected.x(0) + expected.delay(3926, 0) + expected.x(0) + expected.delay(1962, 0) + expected.delay(1975, 1) + expected.x(1) + expected.delay(3950, 1) + expected.x(1) + expected.delay(1975, 1) + expected.delay(225, 2) + expected.x(2) + expected.delay(450, 2) + expected.x(2) + expected.delay(225, 2) + expected.x(2) + expected.delay(1712, 2) + expected.x(2) + expected.delay(3426, 2) + expected.x(2) + expected.delay(1712, 2) + + self.assertEqual(expected, qc_dd) + + def test_reproducible(self): + """Test DD calls are reproducible.""" + + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.x(2) + qc.delay(1000, 1) + with qc.if_test((0, True)): + qc.x(1) + qc.delay(800, 1) + with qc.if_test((0, True)): + qc.x(2) + qc.delay(1000, 2) + qc.x(0) + qc.x(2) + + dd_sequence = [XGate(), XGate()] + pm0 = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling(self.durations, dd_sequence, schedule_idle_qubits=True), + ] + ) + + pm1 = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling(self.durations, dd_sequence, schedule_idle_qubits=True), + ] + ) + qc_dd0 = pm0.run(qc) + qc_dd1 = pm1.run(qc) + + self.assertEqual(qc_dd0, qc_dd1) + + def test_nested_block_dd(self): + """Test DD applied within a block.""" + + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + schedule_idle_qubits=True, + ), + ] + ) + + qc = QuantumCircuit(3, 1) + qc.x(1) + qc.x(2) + qc.barrier() + with qc.if_test((0, True)): + qc.delay(1000, 1) + + qc_dd = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(50, 0) + expected.x(1) + expected.x(2) + expected.barrier() + with expected.if_test((0, True)): + expected.delay(225, 1) + expected.x(1) + expected.delay(450, 1) + expected.x(1) + expected.delay(225, 1) + expected.delay(225, 2) + expected.x(2) + expected.delay(450, 2) + expected.x(2) + expected.delay(225, 2) + expected.delay(1000, 0) + + self.assertEqual(expected, qc_dd) + + def test_multiple_dd_sequences(self): + """Test multiple DD sequence can be submitted""" + + qc = QuantumCircuit(2, 0) + qc.x(0) # First delay so qubits are touched + qc.x(1) + qc.delay(500, 0) + qc.barrier() + qc.delay(2000, 1) + + dd_sequence = [ + [XGate(), XGate(), XGate(), XGate(), XGate(), XGate(), XGate(), XGate()], + [XGate(), XGate()], + ] + + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[1.5, 0.0], + schedule_idle_qubits=True, + ), + ] + ) + + qc_dd = pm.run(qc) + + expected = QuantumCircuit(2, 0) + expected.x(0) + expected.delay(100, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(100, 0) + expected.x(1) + expected.delay(100, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(100, 1) + expected.barrier() + expected.delay(100, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(200, 0) + expected.x(0) + expected.delay(100, 0) + expected.delay(100, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 1) + expected.x(1) + expected.delay(100, 1) + + self.assertEqual(qc_dd, expected) + + def test_multiple_dd_sequence_cycles(self): + """Test a single DD sequence can be inserted for multiple cycles in a single delay.""" + + qc = QuantumCircuit(1, 0) + qc.x(0) # First delay so qubit is touched + qc.delay(2000, 0) + + dd_sequence = [ + [XGate(), XGate()], + ] # cycle has length of 100 cycles + + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + extra_slack_distribution="edges", + pulse_alignment=1, + sequence_min_length_ratios=[10.0], + insert_multiple_cycles=True, + schedule_idle_qubits=True, + ), + ] + ) + + qc_dd = pm.run(qc) + + expected = QuantumCircuit(1, 0) + expected.x(0) + expected.delay(225, 0) + expected.x(0) + expected.delay(450, 0) + expected.x(0) + expected.delay(225, 0) + expected.delay(225, 0) + expected.x(0) + expected.delay(450, 0) + expected.x(0) + expected.delay(225, 0) + self.assertEqual(qc_dd, expected) + + def test_staggered_dd(self): + """Test that timing on DD can be staggered if coupled with each other""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + coupling_map=self.coupling_map, + alt_spacings=[0.1, 0.8, 0.1], + schedule_idle_qubits=True, + ), + ] + ) + + qc_barriers = QuantumCircuit(4, 1) + qc_barriers.x(0) + qc_barriers.x(1) + qc_barriers.x(2) + qc_barriers.x(3) + qc_barriers.barrier() + qc_barriers.measure(0, 0) + qc_barriers.delay(14, 0) + qc_barriers.x(1) + qc_barriers.x(2) + qc_barriers.x(3) + qc_barriers.barrier() + + qc_dd = pm.run(qc_barriers) + + expected = QuantumCircuit(4, 1) + expected.x(0) + expected.x(1) + expected.x(2) + expected.x(3) + expected.barrier() + expected.x(1) + expected.delay(208, 1) + expected.x(1) + expected.delay(448, 1) + expected.x(1) + expected.delay(208, 1) + expected.x(2) + expected.delay(80, 2) # q1-q2 are coupled, staggered delays + expected.x(2) + expected.delay(704, 2) + expected.x(2) + expected.delay(80, 2) # q2-q3 are uncoupled, same delays + expected.x(3) + expected.delay(208, 3) + expected.x(3) + expected.delay(448, 3) + expected.x(3) + expected.delay(208, 3) + expected.measure(0, 0) + expected.delay(14, 0) + expected.barrier() + + self.assertEqual(qc_dd, expected) + + def test_insert_dd_bad_spacings(self): + """Test DD raises when spacings don't add up to 1.""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + spacings=[0.1, 0.9, 0.1], + coupling_map=self.coupling_map, + ), + ] + ) + + with self.assertRaises(TranspilerError): + pm.run(self.ghz4) + + def test_insert_dd_bad_alt_spacings(self): + """Test DD raises when alt_spacings don't add up to 1.""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + alt_spacings=[0.1, 0.9, 0.1], + coupling_map=self.coupling_map, + ), + ] + ) + + with self.assertRaises(TranspilerError): + pm.run(self.ghz4) + + def test_unsupported_coupling_map(self): + """Test DD raises if coupling map is not supported.""" + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + coupling_map=CouplingMap([[0, 1], [0, 2], [1, 2], [2, 3]]), + ), + ] + ) + + with self.assertRaises(TranspilerError): + pm.run(self.ghz4) + + def test_disjoint_coupling_map(self): + """Test staggered DD with disjoint coupling map.""" + qc = QuantumCircuit(5) + for q in range(5): + qc.x(q) + qc.barrier() + for q in range(5): + qc.delay(1600, q) + qc.barrier() + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + coupling_map=CouplingMap([[0, 1], [1, 2], [3, 4]]), + schedule_idle_qubits=True, + ), + ] + ) + dd_qc = pm.run(qc) + + # ensure that delays for nearest neighbors are staggered + dag = circuit_to_dag(dd_qc) + delays = dag.op_nodes(Delay) + delay_dict = {q_ind: [] for q_ind in range(5)} + for delay in delays: + delay_dict[dag.find_bit(delay.qargs[0]).index] += [delay.op.duration] + self.assertNotEqual(delay_dict[0], delay_dict[1]) + self.assertNotEqual(delay_dict[1], delay_dict[2]) + self.assertNotEqual(delay_dict[3], delay_dict[4]) + self.assertEqual(delay_dict[0], delay_dict[2]) + + def test_no_unused_qubits(self): + """Test DD with if_test circuit that unused qubits are untouched and not scheduled. + + This ensures that programs don't have unnecessary information for unused qubits. + Which might hurt performance in later executon stages. + """ + + dd_sequence = [XGate(), XGate()] + pm = PassManager( + [ + ASAPScheduleAnalysis(self.durations), + PadDynamicalDecoupling( + self.durations, + dd_sequence, + pulse_alignment=1, + sequence_min_length_ratios=[0.0], + ), + ] + ) + + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.x(1) + with qc.if_test((0, True)): + qc.x(1) + qc.measure(0, 0) + with qc.if_test((0, True)): + qc.x(0) + qc.x(1) + qc_dd = pm.run(qc) + dont_use = qc_dd.qubits[-1] + for op in qc_dd.data: + self.assertNotIn(dont_use, op.qubits) diff --git a/test/unit/transpiler/passes/scheduling/test_scheduler.py b/test/unit/transpiler/passes/scheduling/test_scheduler.py new file mode 100644 index 000000000..e9ed82e1f --- /dev/null +++ b/test/unit/transpiler/passes/scheduling/test_scheduler.py @@ -0,0 +1,1860 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Test the dynamic circuits scheduling analysis""" + +from unittest.mock import patch + +from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile +from qiskit.providers.fake_provider import FakeJakarta +from qiskit.pulse import Schedule, Play, Constant, DriveChannel +from qiskit.transpiler.passes import ConvertConditionsToIfOps +from qiskit.transpiler.passmanager import PassManager +from qiskit.transpiler.exceptions import TranspilerError + +from qiskit_ibm_runtime.transpiler.passes.scheduling.pad_delay import PadDelay +from qiskit_ibm_runtime.transpiler.passes.scheduling.scheduler import ( + ALAPScheduleAnalysis, + ASAPScheduleAnalysis, +) +from qiskit_ibm_runtime.transpiler.passes.scheduling.utils import ( + DynamicCircuitInstructionDurations, +) + +from .control_flow_test_case import ControlFlowTestCase + +# pylint: disable=invalid-name,not-context-manager + + +class TestASAPSchedulingAndPaddingPass(ControlFlowTestCase): + """Tests the ASAP Scheduling passes""" + + def test_if_test_gate_after_measure(self): + """Test if schedules circuits with c_if after measure with a common clbit. + See: https://github.com/Qiskit/qiskit-terra/issues/7654""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + with qc.if_test((0, 0)) as else_: + qc.x(1) + with else_: + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(1000, 1) + expected.measure(0, 0) + with expected.if_test((0, 0)) as else_: + expected.delay(200, 0) + expected.x(1) + with else_: + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_c_if_raises(self): + """Verify that old format c_if raises an error.""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + qc.x(1).c_if(0, True) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + with self.assertRaises(TranspilerError): + pm.run(qc) + + def test_c_if_conversion(self): + """Verify that old format c_if may be converted and scheduled.""" + qc = QuantumCircuit(1, 1) + qc.x(0).c_if(0, True) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager( + [ + ConvertConditionsToIfOps(), + ASAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ) + scheduled = pm.run(qc) + + expected = QuantumCircuit(1, 1) + with expected.if_test((0, 1)): + expected.x(0) + + self.assertEqual(expected, scheduled) + + def test_measure_after_measure(self): + """Test if schedules circuits with measure after measure with a common clbit. + + Note: There is no delay to write into the same clbit with IBM backends.""" + qc = QuantumCircuit(2, 1) + qc.x(0) + qc.measure(0, 0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + expected.measure(0, 0) + expected.measure(1, 0) + self.assertEqual(expected, scheduled) + + def test_measure_block_not_end(self): + """Tests that measures trigger do not trigger the end of a scheduling block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.measure(0, 0) + qc.measure(1, 0) + qc.x(2) + qc.measure(1, 0) + qc.measure(2, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.x(2) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.measure(1, 0) + expected.delay(1000, 0) + expected.measure(1, 0) + expected.measure(2, 0) + + self.assertEqual(expected, scheduled) + + def test_reset_block_end(self): + """Tests that measures trigger do trigger the end of a scheduling block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + qc.x(2) + qc.measure(1, 0) + qc.measure(2, 0) + + durations = DynamicCircuitInstructionDurations( + [("x", None, 200), ("measure", None, 840), ("reset", None, 840)] + ) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.x(2) + expected.delay(1000, 2) + expected.reset(0) + expected.measure(1, 0) + expected.barrier() + expected.delay(1000, 0) + expected.measure(1, 0) + expected.measure(2, 0) + + self.assertEqual(expected, scheduled) + + def test_c_if_on_different_qubits(self): + """Test if schedules circuits with `c_if`s on different qubits.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(1) + qc.x(2) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.x(2) + + self.assertEqual(expected, scheduled) + + def test_shorter_measure_after_measure(self): + """Test if schedules circuits with shorter measure after measure + with a common clbit. + + Note: For dynamic circuits support we currently group measurements + to start at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations( + [("measure", [0], 840), ("measure", [1], 540)] + ) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.measure(0, 0) + expected.measure(1, 0) + expected.delay(300, 1) + expected.delay(1000, 2) + + self.assertEqual(expected, scheduled) + + def test_measure_after_c_if(self): + """Test if schedules circuits with c_if after measure with a common clbit.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(1) + qc.measure(2, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + + expected.barrier() + expected.delay(1000, 0) + expected.measure(2, 0) + expected.delay(1000, 1) + + self.assertEqual(expected, scheduled) + + def test_parallel_gate_different_length(self): + """Test circuit having two parallel instruction with different length.""" + qc = QuantumCircuit(2, 2) + qc.x(0) + qc.x(1) + qc.measure(0, 0) + qc.measure(1, 1) + + durations = DynamicCircuitInstructionDurations( + [("x", [0], 200), ("x", [1], 400), ("measure", None, 840)] + ) + + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 2) + expected.x(0) + expected.x(1) + expected.delay(200, 0) + expected.measure(0, 0) # immediately start after X gate + expected.measure(1, 1) + + self.assertEqual(scheduled, expected) + + def test_parallel_gate_different_length_with_barrier(self): + """Test circuit having two parallel instruction with different length with barrier.""" + qc = QuantumCircuit(2, 2) + qc.x(0) + qc.x(1) + qc.barrier() + qc.measure(0, 0) + qc.measure(1, 1) + + durations = DynamicCircuitInstructionDurations( + [("x", [0], 200), ("x", [1], 400), ("measure", None, 840)] + ) + + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 2) + expected.x(0) + expected.delay(200, 0) + expected.x(1) + expected.barrier() + expected.measure(0, 0) + expected.measure(1, 1) + + self.assertEqual(scheduled, expected) + + def test_active_reset_circuit(self): + """Test practical example of reset circuit. + + Because of the stimulus pulse overlap with the previous XGate on the q register, + measure instruction is always triggered after XGate regardless of write latency. + Thus only conditional latency matters in the scheduling.""" + qc = QuantumCircuit(1, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 100), ("measure", None, 840)]) + + scheduled = PassManager( + [ + ASAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(qc) + + expected = QuantumCircuit(1, 1) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.barrier() + + self.assertEqual(expected, scheduled) + + def test_dag_introduces_extra_dependency_between_conditionals(self): + """Test dependency between conditional operations in the scheduling. + + In the below example circuit, the conditional x on q1 could start at time 0, + however it must be scheduled after the conditional x on q0 in scheduling. + That is because circuit model used in the transpiler passes (DAGCircuit) + interprets instructions acting on common clbits must be run in the order + given by the original circuit (QuantumCircuit).""" + qc = QuantumCircuit(2, 1) + qc.delay(100, 0) + with qc.if_test((0, 1)): + qc.x(0) + with qc.if_test((0, 1)): + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 160)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(100, 0) + expected.delay(100, 1) + expected.barrier() + with expected.if_test((0, 1)): + expected.x(0) + expected.delay(160, 1) + with expected.if_test((0, 1)): + expected.delay(160, 0) + expected.x(1) + + self.assertEqual(expected, scheduled) + + def test_scheduling_with_calibration(self): + """Test if calibrated instruction can update node duration.""" + qc = QuantumCircuit(2) + qc.x(0) + qc.cx(0, 1) + qc.x(1) + qc.cx(0, 1) + + xsched = Schedule(Play(Constant(300, 0.1), DriveChannel(0))) + qc.add_calibration("x", (0,), xsched) + + durations = DynamicCircuitInstructionDurations([("x", None, 160), ("cx", None, 600)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2) + expected.x(0) + expected.delay(300, 1) + expected.cx(0, 1) + expected.x(1) + expected.delay(160, 0) + expected.cx(0, 1) + expected.add_calibration("x", (0,), xsched) + + self.assertEqual(expected, scheduled) + + def test_padding_not_working_without_scheduling(self): + """Test padding fails when un-scheduled DAG is input.""" + qc = QuantumCircuit(1, 1) + qc.delay(100, 0) + qc.x(0) + qc.measure(0, 0) + + with self.assertRaises(TranspilerError): + PassManager(PadDelay()).run(qc) + + def test_no_pad_very_end_of_circuit(self): + """Test padding option that inserts no delay at the very end of circuit. + + This circuit will be unchanged after scheduling/padding.""" + qc = QuantumCircuit(2, 1) + qc.delay(100, 0) + qc.x(1) + qc.measure(0, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 160), ("measure", None, 840)]) + + scheduled = PassManager( + [ + ASAPScheduleAnalysis(durations), + PadDelay(fill_very_end=False, schedule_idle_qubits=True), + ] + ).run(qc) + + expected = qc.copy() + + self.assertEqual(expected, scheduled) + + def test_reset_terminates_block(self): + """Test if reset operations terminate the block scheduled. + + Note: For dynamic circuits support we currently group resets + to start at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations( + [ + ("x", None, 200), + ( + "reset", + [0], + 840, + ), # ignored as only the duration of the measurement is used for scheduling + ( + "reset", + [1], + 740, + ), # ignored as only the duration of the measurement is used for scheduling + ("measure", [0], 440), + ("measure", [1], 540), + ] + ) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.delay(900, 2) + expected.reset(0) + expected.delay(100, 0) + expected.measure(1, 0) + expected.barrier() + expected.x(0) + expected.delay(200, 1) + expected.delay(200, 2) + + self.assertEqual(expected, scheduled) + + def test_reset_merged_with_measure(self): + """Test if reset operations terminate the block scheduled. + + Note: For dynamic circuits support we currently group resets to start + at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations( + [ + ("x", None, 200), + ( + "reset", + [0], + 840, + ), # ignored as only the duration of the measurement is used for scheduling + ( + "reset", + [1], + 740, + ), # ignored as only the duration of the measurement is used for scheduling + ("measure", [0], 440), + ("measure", [1], 540), + ] + ) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.delay(900, 2) + expected.reset(0) + expected.measure(1, 0) + expected.delay(100, 0) + + self.assertEqual(expected, scheduled) + + def test_scheduling_is_idempotent(self): + """Test that padding can be applied back to back without changing the circuit.""" + qc = QuantumCircuit(3, 2) + qc.x(2) + qc.cx(0, 1) + qc.barrier() + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + + durations = DynamicCircuitInstructionDurations( + [("x", None, 100), ("measure", None, 840), ("cx", None, 500)] + ) + + scheduled0 = PassManager( + [ + ASAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(qc) + + scheduled1 = PassManager( + [ + ASAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(scheduled0) + + self.assertEqual(scheduled0, scheduled1) + + def test_gate_on_measured_qubit(self): + """Test that a gate on a previously measured qubit triggers the end of the block""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + qc.x(0) + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(1) + expected.delay(1000, 1) + expected.measure(0, 0) + expected.x(0) + + self.assertEqual(expected, scheduled) + + def test_grouped_measurements_prior_control_flow(self): + """Test that measurements are grouped prior to control-flow""" + qc = QuantumCircuit(3, 3) + qc.measure(0, 0) + qc.measure(1, 1) + with qc.if_test((0, 1)): + qc.x(2) + with qc.if_test((1, 1)): + qc.x(2) + qc.measure(2, 2) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 3) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.measure(1, 1) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + with expected.if_test((1, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + expected.barrier() + expected.delay(1000, 0) + expected.delay(1000, 1) + expected.measure(2, 2) + + self.assertEqual(expected, scheduled) + + def test_back_to_back_c_if(self): + """Test back to back c_if scheduling""" + + qc = QuantumCircuit(3, 1) + qc.delay(800, 1) + with qc.if_test((0, 1)): + qc.x(1) + with qc.if_test((0, 1)): + qc.x(2) + + qc.delay(1000, 2) + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(800, 0) + expected.delay(800, 1) + expected.delay(800, 2) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + expected.barrier() + expected.delay(1000, 0) + expected.x(1) + expected.delay(800, 1) + expected.delay(1000, 2) + + self.assertEqual(expected, scheduled) + + def test_nested_control_scheduling(self): + """Test scheduling of nested control-flow""" + + qc = QuantumCircuit(4, 3) + qc.x(0) + with qc.if_test((0, 1)): + qc.x(1) + qc.measure(0, 1) + with qc.if_test((1, 0)): + qc.x(0) + qc.measure(2, 2) + qc.x(3) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(4, 3) + expected.x(0) + expected.delay(200, 1) + expected.delay(200, 2) + expected.delay(200, 3) + expected.barrier() + with expected.if_test((0, 1)): + expected.measure(0, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.delay(1000, 3) + expected.barrier() + with expected.if_test((1, 0)): + expected.x(0) + expected.delay(800, 0) + expected.delay(1000, 1) + expected.measure(2, 2) + expected.delay(1000, 3) + expected.barrier() + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + expected.delay(200, 3) + expected.barrier() + expected.delay(200, 0) + expected.delay(200, 1) + expected.delay(200, 2) + expected.x(3) + + self.assertEqual(expected, scheduled) + + def test_while_loop(self): + """Test scheduling while loop""" + + qc = QuantumCircuit(2, 1) + qc.x(0) + with qc.while_loop((0, 1)): + qc.x(1) + qc.measure(0, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + with expected.while_loop((0, 1)): + expected.x(1) + expected.delay(800, 1) + expected.measure(0, 0) + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_for_loop(self): + """Test scheduling for loop""" + + qc = QuantumCircuit(2, 1) + qc.x(0) + with qc.for_loop(range(2)): + qc.x(1) + qc.measure(0, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + with expected.for_loop(range(2)): + expected.x(1) + expected.delay(800, 1) + expected.measure(0, 0) + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_registers(self): + """Verify scheduling works with registers.""" + qr = QuantumRegister(1, name="q") + cr = ClassicalRegister(1) + qc = QuantumCircuit(qr, cr) + with qc.if_test((cr[0], True)): + qc.x(qr[0]) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager( + [ + ConvertConditionsToIfOps(), + ASAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ) + scheduled = pm.run(qc) + + expected = QuantumCircuit(qr, cr) + with expected.if_test((cr[0], True)): + expected.x(qr[0]) + + self.assertEqual(expected, scheduled) + + def test_c_if_plugin_conversion_with_transpile(self): + """Verify that old format c_if may be converted and scheduled + after transpilation with the plugin.""" + # Patch the test backend with the plugin + with patch.object( + FakeJakarta, + "get_translation_stage_plugin", + return_value="ibm_dynamic_circuits", + create=True, + ): + backend = FakeJakarta() + # Temporary workaround for mock backends. For real backends this is not required. + backend.configuration().basis_gates.append("if_else") + + durations = DynamicCircuitInstructionDurations.from_backend(backend) + pm = PassManager([ASAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + + qr0 = QuantumRegister(1, name="q") + cr = ClassicalRegister(1, name="c") + qc = QuantumCircuit(qr0, cr) + qc.x(qr0[0]).c_if(cr[0], True) + + qc_transpiled = transpile(qc, backend, initial_layout=[0]) + + scheduled = pm.run(qc_transpiled) + + qr1 = QuantumRegister(7, name="q") + cr = ClassicalRegister(1, name="c") + expected = QuantumCircuit(qr1, cr) + with expected.if_test((cr[0], True)): + expected.x(qr1[0]) + expected.delay(160, qr1[1]) + expected.delay(160, qr1[2]) + expected.delay(160, qr1[3]) + expected.delay(160, qr1[4]) + expected.delay(160, qr1[5]) + expected.delay(160, qr1[6]) + + self.assertEqual(expected, scheduled) + + +class TestALAPSchedulingAndPaddingPass(ControlFlowTestCase): + """Tests the ALAP Scheduling passes""" + + def test_alap(self): + """Test standard ALAP scheduling""" + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.x(1) + + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.measure(0, 0) + expected.delay(800, 1) + expected.x(1) + expected.delay(1000, 2) + + self.assertEqual(expected, scheduled) + + def test_if_test_gate_after_measure(self): + """Test if schedules circuits with c_if after measure with a common clbit. + See: https://github.com/Qiskit/qiskit-terra/issues/7654""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + with qc.if_test((0, 0)) as else_: + qc.x(1) + with else_: + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(1000, 1) + expected.measure(0, 0) + with expected.if_test((0, 0)) as else_: + expected.delay(200, 0) + expected.x(1) + with else_: + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_classically_controlled_gate_after_measure(self): + """Test if schedules circuits with c_if after measure with a common clbit. + See: https://github.com/Qiskit/qiskit-terra/issues/7654""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + with qc.if_test((0, True)): + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(1000, 1) + expected.measure(0, 0) + expected.barrier() + with expected.if_test((0, True)): + expected.delay(200, 0) + expected.x(1) + + self.assertEqual(expected, scheduled) + + def test_measure_after_measure(self): + """Test if schedules circuits with measure after measure with a common clbit. + + Note: There is no delay to write into the same clbit with IBM backends.""" + qc = QuantumCircuit(2, 1) + qc.x(0) + qc.measure(0, 0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + expected.measure(0, 0) + expected.measure(1, 0) + + self.assertEqual(expected, scheduled) + + def test_measure_block_not_end(self): + """Tests that measures trigger do not trigger the end of a scheduling block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.measure(0, 0) + qc.measure(1, 0) + qc.x(2) + qc.measure(1, 0) + qc.measure(2, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.delay(1000, 2) + expected.x(2) + expected.measure(0, 0) + expected.delay(1000, 0) + expected.measure(1, 0) + expected.measure(1, 0) + expected.measure(2, 0) + + self.assertEqual(expected, scheduled) + + def test_reset_block_end(self): + """Tests that measures trigger do trigger the end of a scheduling block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + qc.x(2) + qc.measure(1, 0) + qc.measure(2, 0) + qc.measure(0, 0) + + durations = DynamicCircuitInstructionDurations( + [("x", None, 200), ("measure", None, 840), ("reset", None, 840)] + ) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.reset(0) + expected.delay(200, 1) + expected.delay(1000, 2) + expected.x(2) + expected.measure(1, 0) + expected.barrier() + expected.measure(1, 0) + expected.measure(2, 0) + expected.measure(0, 0) + + self.assertEqual(expected, scheduled) + + def test_c_if_on_different_qubits(self): + """Test if schedules circuits with `c_if`s on different qubits.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(1) + qc.x(2) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.x(2) + + self.assertEqual(expected, scheduled) + + def test_shorter_measure_after_measure(self): + """Test if schedules circuits with shorter measure after measure + with a common clbit. + + Note: For dynamic circuits support we currently group measurements + to start at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations( + [("measure", [0], 840), ("measure", [1], 540)] + ) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.measure(1, 0) + expected.delay(300, 1) + + self.assertEqual(expected, scheduled) + + def test_measure_after_c_if(self): + """Test if schedules circuits with c_if after measure with a common clbit.""" + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(1) + qc.measure(2, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + expected.barrier() + expected.delay(1000, 0) + expected.measure(2, 0) + expected.delay(1000, 1) + + self.assertEqual(expected, scheduled) + + def test_parallel_gate_different_length(self): + """Test circuit having two parallel instruction with different length.""" + qc = QuantumCircuit(2, 2) + qc.x(0) + qc.x(1) + qc.measure(0, 0) + qc.measure(1, 1) + + durations = DynamicCircuitInstructionDurations( + [("x", [0], 200), ("x", [1], 400), ("measure", None, 840)] + ) + + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 2) + expected.delay(200, 0) + expected.x(0) + expected.x(1) + expected.measure(0, 0) # immediately start after X gate + expected.measure(1, 1) + + self.assertEqual(scheduled, expected) + + def test_parallel_gate_different_length_with_barrier(self): + """Test circuit having two parallel instruction with different length with barrier.""" + qc = QuantumCircuit(2, 2) + qc.x(0) + qc.x(1) + qc.barrier() + qc.measure(0, 0) + qc.measure(1, 1) + + durations = DynamicCircuitInstructionDurations( + [("x", [0], 200), ("x", [1], 400), ("measure", None, 840)] + ) + + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 2) + expected.delay(200, 0) + expected.x(0) + expected.x(1) + expected.barrier() + expected.measure(0, 0) + expected.measure(1, 1) + + self.assertEqual(scheduled, expected) + + def test_active_reset_circuit(self): + """Test practical example of reset circuit. + + Because of the stimulus pulse overlap with the previous XGate on the q register, + measure instruction is always triggered after XGate regardless of write latency. + Thus only conditional latency matters in the scheduling.""" + qc = QuantumCircuit(1, 1) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 100), ("measure", None, 840)]) + + scheduled = PassManager( + [ + ALAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(qc) + + expected = QuantumCircuit(1, 1) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.measure(0, 0) + with expected.if_test((0, 1)): + expected.x(0) + expected.barrier() + + self.assertEqual(expected, scheduled) + + def test_dag_introduces_extra_dependency_between_conditionals(self): + """Test dependency between conditional operations in the scheduling. + + In the below example circuit, the conditional x on q1 could start at time 0, + however it must be scheduled after the conditional x on q0 in scheduling. + That is because circuit model used in the transpiler passes (DAGCircuit) + interprets instructions acting on common clbits must be run in the order + given by the original circuit (QuantumCircuit).""" + qc = QuantumCircuit(2, 1) + qc.delay(100, 0) + with qc.if_test((0, 1)): + qc.x(0) + with qc.if_test((0, 1)): + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 160)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(100, 0) + expected.delay(100, 1) + expected.barrier() + with expected.if_test((0, 1)): + expected.x(0) + expected.delay(160, 1) + with expected.if_test((0, 1)): + expected.delay(160, 0) + expected.x(1) + + self.assertEqual(expected, scheduled) + + def test_scheduling_with_calibration(self): + """Test if calibrated instruction can update node duration.""" + qc = QuantumCircuit(2) + qc.x(0) + qc.cx(0, 1) + qc.x(1) + qc.cx(0, 1) + + xsched = Schedule(Play(Constant(300, 0.1), DriveChannel(0))) + qc.add_calibration("x", (0,), xsched) + + durations = DynamicCircuitInstructionDurations([("x", None, 160), ("cx", None, 600)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2) + expected.x(0) + expected.delay(300, 1) + expected.cx(0, 1) + expected.x(1) + expected.delay(160, 0) + expected.cx(0, 1) + expected.add_calibration("x", (0,), xsched) + + self.assertEqual(expected, scheduled) + + def test_padding_not_working_without_scheduling(self): + """Test padding fails when un-scheduled DAG is input.""" + qc = QuantumCircuit(1, 1) + qc.delay(100, 0) + qc.x(0) + qc.measure(0, 0) + + with self.assertRaises(TranspilerError): + PassManager(PadDelay()).run(qc) + + def test_no_pad_very_end_of_circuit(self): + """Test padding option that inserts no delay at the very end of circuit. + + This circuit will be unchanged after scheduling/padding.""" + qc = QuantumCircuit(2, 1) + qc.delay(100, 0) + qc.x(1) + qc.measure(0, 0) + + durations = DynamicCircuitInstructionDurations([("x", None, 160), ("measure", None, 840)]) + + scheduled = PassManager( + [ + ALAPScheduleAnalysis(durations), + PadDelay(fill_very_end=False, schedule_idle_qubits=True), + ] + ).run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(100, 0) + expected.measure(0, 0) + expected.delay(940, 1) + expected.x(1) + + self.assertEqual(expected, scheduled) + + def test_reset_terminates_block(self): + """Test if reset operations terminate the block scheduled. + + Note: For dynamic circuits support we currently group resets + to start at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations( + [ + ("x", None, 200), + ( + "reset", + [0], + 840, + ), # ignored as only the duration of the measurement is used for scheduling + ( + "reset", + [1], + 740, + ), # ignored as only the duration of the measurement is used for scheduling + ("measure", [0], 440), + ("measure", [1], 540), + ] + ) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.delay(900, 2) + expected.reset(0) + expected.delay(100, 0) + expected.measure(1, 0) + expected.barrier() + expected.x(0) + expected.delay(200, 1) + expected.delay(200, 2) + + self.assertEqual(expected, scheduled) + + def test_reset_merged_with_measure(self): + """Test if reset operations terminate the block scheduled. + + Note: For dynamic circuits support we currently group resets to start + at the same time which in turn trigger the end of a block.""" + qc = QuantumCircuit(3, 1) + qc.x(0) + qc.reset(0) + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations( + [ + ("x", None, 200), + ( + "reset", + [0], + 840, + ), # ignored as only the duration of the measurement is used for scheduling + ( + "reset", + [1], + 740, + ), # ignored as only the duration of the measurement is used for scheduling + ("measure", [0], 440), + ("measure", [1], 540), + ] + ) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.x(0) + expected.delay(200, 1) + expected.delay(900, 2) + expected.reset(0) + expected.delay(100, 0) + expected.measure(1, 0) + + self.assertEqual(expected, scheduled) + + def test_already_scheduled(self): + """Test no changes to pre-scheduled""" + qc = QuantumCircuit(3, 2) + qc.cx(0, 1) + qc.delay(400, 2) + qc.x(2) + qc.barrier() + qc.measure(0, 0) + qc.delay(1000, 1) + qc.delay(1000, 2) + qc.barrier() + with qc.if_test((0, 1)): + qc.x(0) + qc.delay(100, 1) + qc.delay(100, 2) + qc.barrier() + qc.measure(0, 0) + qc.delay(1000, 1) + qc.delay(1000, 2) + qc.barrier() + + durations = DynamicCircuitInstructionDurations( + [("x", None, 100), ("measure", None, 840), ("cx", None, 500)] + ) + + scheduled = PassManager( + [ + ALAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(qc) + + self.assertEqual(qc, scheduled) + + def test_scheduling_is_idempotent(self): + """Test that padding can be applied back to back without changing the circuit.""" + qc = QuantumCircuit(3, 2) + qc.x(2) + qc.cx(0, 1) + qc.barrier() + qc.measure(0, 0) + with qc.if_test((0, 1)): + qc.x(0) + qc.measure(0, 0) + + durations = DynamicCircuitInstructionDurations( + [("x", None, 100), ("measure", None, 840), ("cx", None, 500)] + ) + + scheduled0 = PassManager( + [ + ALAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(qc) + + scheduled1 = PassManager( + [ + ALAPScheduleAnalysis(durations), + PadDelay(schedule_idle_qubits=True), + ] + ).run(scheduled0) + + self.assertEqual(scheduled0, scheduled1) + + def test_gate_on_measured_qubit(self): + """Test that a gate on a previously measured qubit triggers the end of the block""" + qc = QuantumCircuit(2, 1) + qc.measure(0, 0) + qc.x(0) + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.delay(1000, 1) + expected.x(1) + expected.measure(0, 0) + expected.x(0) + + self.assertEqual(expected, scheduled) + + def test_grouped_measurements_prior_control_flow(self): + """Test that measurements are grouped prior to control-flow""" + qc = QuantumCircuit(3, 3) + qc.measure(0, 0) + qc.measure(1, 1) + with qc.if_test((0, 1)): + qc.x(2) + with qc.if_test((1, 1)): + qc.x(2) + qc.measure(2, 2) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 3) + expected.delay(1000, 2) + expected.measure(0, 0) + expected.measure(1, 1) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + with expected.if_test((1, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + expected.barrier() + expected.delay(1000, 0) + expected.delay(1000, 1) + expected.measure(2, 2) + + self.assertEqual(expected, scheduled) + + def test_fast_path_eligible_scheduling(self): + """Test scheduling of the fast-path eligible blocks. + Verify that no barrier is inserted between measurements and fast-path conditionals. + """ + qc = QuantumCircuit(4, 3) + qc.x(0) + qc.delay(1500, 1) + qc.delay(1500, 2) + qc.x(3) + qc.barrier(1, 2, 3) + qc.measure(0, 0) + qc.measure(1, 1) + with qc.if_test((0, 1)): + qc.x(0) + with qc.if_test((1, 1)): + qc.x(1) + qc.x(0) + qc.x(0) + qc.x(1) + qc.x(2) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(4, 3) + expected.delay(1300, 0) + expected.x(0) + expected.delay(1500, 1) + expected.delay(1500, 2) + expected.delay(1300, 3) + expected.x(3) + expected.barrier(1, 2, 3) + expected.measure(0, 0) + expected.measure(1, 1) + expected.delay(1000, 2) + expected.delay(1000, 3) + with expected.if_test((0, 1)): + expected.x(0) + with expected.if_test((1, 1)): + expected.x(1) + expected.barrier() + expected.x(0) + expected.x(0) + expected.delay(200, 1) + expected.x(1) + expected.delay(200, 2) + expected.x(2) + expected.delay(400, 3) + + self.assertEqual(expected, scheduled) + + def test_back_to_back_c_if(self): + """Test back to back c_if scheduling""" + + qc = QuantumCircuit(3, 1) + qc.delay(800, 1) + with qc.if_test((0, 1)): + qc.x(1) + with qc.if_test((0, 1)): + qc.x(2) + + qc.delay(1000, 2) + qc.x(1) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 1) + expected.delay(800, 0) + expected.delay(800, 1) + expected.delay(800, 2) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + with expected.if_test((0, 1)): + expected.delay(200, 0) + expected.delay(200, 1) + expected.x(2) + expected.barrier() + expected.delay(1000, 0) + expected.delay(800, 1) + expected.x(1) + expected.delay(1000, 2) + self.assertEqual(expected, scheduled) + + def test_issue_458_extra_idle_bug_0(self): + """Regression test for https://github.com/Qiskit/qiskit-ibm-provider/issues/458 + + This demonstrates that delays on idle qubits are pushed to the last schedulable + region. This may happen if Terra's default scheduler is used and then the + dynamic circuit scheduler is applied. + """ + + qc = QuantumCircuit(4, 3) + + qc.cx(0, 1) + qc.delay(700, 0) + qc.delay(700, 2) + qc.cx(1, 2) + qc.delay(3560, 3) + qc.barrier([0, 1, 2]) + + qc.delay(1160, 0) + qc.delay(1000, 2) + qc.measure(1, 0) + qc.delay(160, 1) + with qc.if_test((0, 1)): + qc.x(2) + qc.barrier([0, 1, 2]) + qc.measure(0, 1) + qc.delay(1000, 1) + qc.measure(2, 2) + + durations = DynamicCircuitInstructionDurations( + [("x", None, 160), ("cx", None, 700), ("measure", None, 840)] + ) + + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(4, 3) + expected.cx(0, 1) + expected.delay(700, 0) + expected.delay(700, 2) + expected.cx(1, 2) + expected.barrier([0, 1, 2]) + expected.delay(2560, 3) + + expected.delay(1160, 0) + expected.delay(1160, 2) + expected.measure(1, 0) + expected.delay(160, 1) + expected.barrier() + with expected.if_test((0, 1)): + expected.delay(160, 0) + expected.delay(160, 1) + expected.x(2) + expected.delay(160, 3) + expected.barrier() + expected.delay(2560, 0) + expected.delay(2560, 1) + expected.delay(2560, 2) + expected.delay(3560, 3) + expected.barrier([0, 1, 2]) + expected.delay(1000, 1) + expected.measure(0, 1) + expected.measure(2, 2) + + self.assertEqual(scheduled, expected) + + def test_issue_458_extra_idle_bug_1(self): + """Regression test for https://github.com/Qiskit/qiskit-ibm-provider/issues/458 + + This demonstrates that a bug with a double-delay insertion has been resolved. + """ + + qc = QuantumCircuit(3, 3) + + qc.rz(0, 2) + qc.barrier() + qc.measure(1, 0) + + durations = DynamicCircuitInstructionDurations( + [("rz", None, 0), ("cx", None, 700), ("measure", None, 840)] + ) + + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(3, 3) + + expected.rz(0, 2) + expected.barrier() + expected.delay(1000, 0) + expected.measure(1, 0) + expected.delay(1000, 2) + + self.assertEqual(scheduled, expected) + + def test_nested_control_scheduling(self): + """Test scheduling of nested control-flow""" + + qc = QuantumCircuit(4, 3) + qc.x(0) + with qc.if_test((0, 1)): + qc.x(1) + qc.measure(0, 1) + with qc.if_test((1, 0)): + qc.x(0) + qc.measure(2, 2) + qc.x(3) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(4, 3) + expected.x(0) + expected.delay(200, 1) + expected.delay(200, 2) + expected.delay(200, 3) + expected.barrier() + with expected.if_test((0, 1)): + expected.measure(0, 1) + expected.delay(1000, 1) + expected.delay(1000, 2) + expected.delay(1000, 3) + expected.barrier() + with expected.if_test((1, 0)): + expected.delay(800, 0) + expected.x(0) + expected.delay(1000, 1) + expected.measure(2, 2) + expected.delay(1000, 3) + expected.barrier() + expected.delay(200, 0) + expected.x(1) + expected.delay(200, 2) + expected.delay(200, 3) + expected.barrier() + expected.delay(200, 0) + expected.delay(200, 1) + expected.delay(200, 2) + expected.x(3) + + self.assertEqual(expected, scheduled) + + def test_while_loop(self): + """Test scheduling while loop""" + + qc = QuantumCircuit(2, 1) + qc.x(0) + with qc.while_loop((0, 1)): + qc.x(1) + qc.measure(0, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + with expected.while_loop((0, 1)): + expected.delay(800, 1) + expected.x(1) + expected.measure(0, 0) + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_for_loop(self): + """Test scheduling for loop""" + + qc = QuantumCircuit(2, 1) + qc.x(0) + with qc.for_loop(range(2)): + qc.x(1) + qc.measure(0, 0) + qc.x(0) + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + scheduled = pm.run(qc) + + expected = QuantumCircuit(2, 1) + expected.x(0) + expected.delay(200, 1) + with expected.for_loop(range(2)): + expected.delay(800, 1) + expected.x(1) + expected.measure(0, 0) + expected.x(0) + expected.delay(200, 1) + + self.assertEqual(expected, scheduled) + + def test_transpile_mock_backend(self): + """Test scheduling works with transpilation.""" + backend = FakeJakarta() + # Temporary workaround for mock backends. For real backends this is not required. + backend.configuration().basis_gates.append("if_else") + backend.configuration().basis_gates.append("while_loop") + + durations = DynamicCircuitInstructionDurations.from_backend(backend) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + + qr = QuantumRegister(3) + cr = ClassicalRegister(2) + + qc = QuantumCircuit(qr, cr) + with qc.while_loop((cr[0], 1)): + qc.x(qr[2]) + with qc.if_test((cr[0], 1)): + qc.x(qr[1]) + qc.x(qr[0]) + + qc_transpiled = transpile(qc, backend, initial_layout=[0, 1, 2]) + + scheduled = pm.run(qc_transpiled) + + qr = QuantumRegister(7, name="q") + expected = QuantumCircuit(qr, cr) + with expected.while_loop((cr[0], 1)): + with expected.if_test((cr[0], 1)): + expected.delay(160, qr[0]) + expected.x(qr[1]) + expected.delay(160, qr[2]) + expected.delay(160, qr[3]) + expected.delay(160, qr[4]) + expected.delay(160, qr[5]) + expected.delay(160, qr[6]) + expected.barrier() + expected.x(qr[0]) + expected.x(qr[2]) + expected.delay(160, qr[1]) + expected.delay(160, qr[3]) + expected.delay(160, qr[4]) + expected.delay(160, qr[5]) + expected.delay(160, qr[6]) + + self.assertEqual(expected, scheduled) + + def test_transpile_both_paths(self): + """Test scheduling works with both fast- and standard path after transpiling.""" + backend = FakeJakarta() + # Temporary workaround for mock backends. For real backends this is not required. + backend.configuration().basis_gates.append("if_else") + + durations = DynamicCircuitInstructionDurations.from_backend(backend) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + + qr = QuantumRegister(3) + cr = ClassicalRegister(2) + + qc = QuantumCircuit(qr, cr) + qc.measure(qr[0], cr[0]) + with qc.if_test((cr[0], 1)): + qc.x(qr[0]) + with qc.if_test((cr[0], 1)): + qc.x(qr[1]) + + qc_transpiled = transpile(qc, backend, initial_layout=[0, 1, 2]) + + scheduled = pm.run(qc_transpiled) + + qr = QuantumRegister(7, name="q") + expected = QuantumCircuit(qr, cr) + expected.delay(24080, qr[1]) + expected.delay(24080, qr[2]) + expected.delay(24080, qr[3]) + expected.delay(24080, qr[4]) + expected.delay(24080, qr[5]) + expected.delay(24080, qr[6]) + expected.measure(qr[0], cr[0]) + with expected.if_test((cr[0], 1)): + expected.x(qr[0]) + with expected.if_test((cr[0], 1)): + expected.delay(160, qr[0]) + expected.x(qr[1]) + expected.delay(160, qr[2]) + expected.delay(160, qr[3]) + expected.delay(160, qr[4]) + expected.delay(160, qr[5]) + expected.delay(160, qr[6]) + self.assertEqual(expected, scheduled) + + def test_c_if_plugin_conversion_with_transpile(self): + """Verify that old format c_if may be converted and scheduled after + transpilation with the plugin.""" + # Patch the test backend with the plugin + with patch.object( + FakeJakarta, + "get_translation_stage_plugin", + return_value="ibm_dynamic_circuits", + create=True, + ): + backend = FakeJakarta() + # Temporary workaround for mock backends. For real backends this is not required. + backend.configuration().basis_gates.append("if_else") + + durations = DynamicCircuitInstructionDurations.from_backend(backend) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay(schedule_idle_qubits=True)]) + + qr0 = QuantumRegister(1, name="q") + cr = ClassicalRegister(1, name="c") + qc = QuantumCircuit(qr0, cr) + qc.x(qr0[0]).c_if(cr[0], True) + + qc_transpiled = transpile(qc, backend, initial_layout=[0]) + + scheduled = pm.run(qc_transpiled) + + qr1 = QuantumRegister(7, name="q") + cr = ClassicalRegister(1, name="c") + expected = QuantumCircuit(qr1, cr) + with expected.if_test((cr[0], True)): + expected.x(qr1[0]) + expected.delay(160, qr1[1]) + expected.delay(160, qr1[2]) + expected.delay(160, qr1[3]) + expected.delay(160, qr1[4]) + expected.delay(160, qr1[5]) + expected.delay(160, qr1[6]) + + self.assertEqual(expected, scheduled) + + def test_no_unused_qubits(self): + """Test DD with if_test circuit that unused qubits are untouched and not scheduled. + + This ensures that programs don't have unnecessary information for unused qubits. + Which might hurt performance in later executon stages. + """ + + durations = DynamicCircuitInstructionDurations([("x", None, 200), ("measure", None, 840)]) + pm = PassManager([ALAPScheduleAnalysis(durations), PadDelay()]) + + qc = QuantumCircuit(3, 1) + qc.measure(0, 0) + qc.x(1) + with qc.if_test((0, True)): + qc.x(1) + qc.measure(0, 0) + with qc.if_test((0, True)): + qc.x(0) + qc.x(1) + + scheduled = pm.run(qc) + + dont_use = scheduled.qubits[-1] + for op in scheduled.data: + self.assertNotIn(dont_use, op.qubits) diff --git a/test/unit/transpiler/passes/scheduling/test_utils.py b/test/unit/transpiler/passes/scheduling/test_utils.py new file mode 100644 index 000000000..50cd79ff7 --- /dev/null +++ b/test/unit/transpiler/passes/scheduling/test_utils.py @@ -0,0 +1,65 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2022. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Tests for Qiskit scheduling utilities.""" + +from qiskit_ibm_runtime.transpiler.passes.scheduling.utils import ( + DynamicCircuitInstructionDurations, +) +from .....ibm_test_case import IBMTestCase + + +class TestDynamicCircuitInstructionDurations(IBMTestCase): + """Tests the DynamicCircuitInstructionDurations patching""" + + def test_patch_measure(self): + """Test if schedules circuits with c_if after measure with a common clbit. + See: https://github.com/Qiskit/qiskit-terra/issues/7654""" + + durations = DynamicCircuitInstructionDurations( + [ + ("x", None, 200), + ("sx", (0,), 200), + ("measure", None, 1000), + ("measure", (0, 1), 1200), + ("reset", None, 800), + ] + ) + + self.assertEqual(durations.get("x", (0,)), 200) + self.assertEqual(durations.get("measure", (0,)), 1160) + self.assertEqual(durations.get("measure", (0, 1)), 1360) + self.assertEqual(durations.get("reset", (0,)), 1160) + + short_odd_durations = DynamicCircuitInstructionDurations( + [ + ("sx", (0,), 112), + ("measure", None, 1000), + ("reset", None, 800), + ] + ) + + self.assertEqual(short_odd_durations.get("measure", (0,)), 1224) + self.assertEqual(short_odd_durations.get("reset", (0,)), 1224) + + def test_patch_disable(self): + """Test if schedules circuits with c_if after measure with a common clbit. + See: https://github.com/Qiskit/qiskit-terra/issues/7654""" + + durations = DynamicCircuitInstructionDurations( + [("x", None, 200), ("measure", None, 1000), ("measure", (0, 1), 1200)], + enable_patching=False, + ) + + self.assertEqual(durations.get("x", (0,)), 200) + self.assertEqual(durations.get("measure", (0,)), 1000) + self.assertEqual(durations.get("measure", (0, 1)), 1200)