From ac0d85832227c44f805166e519a08e0df04a95a9 Mon Sep 17 00:00:00 2001 From: Dan Mills <52407433+daniel-mills-cqc@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:47:02 +0000 Subject: [PATCH] feat: Add convex subgraph finding class (#189) * Add convex subgraph finding class * Integrate convex subgraph finder * Add convex subdag tests * Add type annotation and tests * Move to using compiler pass * Add some documentation * Documentaiton * Move away from class for sub dag finding * Add tests of circuit equiv * Complete documentation * Rename and document rebase pass * Support barriers * Correct documentation * Apply suggestions from code review Co-authored-by: Alec Edgington <54802828+cqc-alec@users.noreply.github.com> * Correct coloured_nodes type annotation --------- Co-authored-by: Alec Edgington <54802828+cqc-alec@users.noreply.github.com> --- docs/coherent_pauli_checks.rst | 6 +- qermit/coherent_pauli_checks/__init__.py | 6 +- .../box_clifford_subcircuits.py | 316 +++++++++++++ .../coherent_pauli_checks/clifford_detect.py | 270 ----------- .../coherent_pauli_checks_mitres.py | 6 +- .../monochromatic_convex_subdag.py | 145 ++++++ tests/coherent_pauli_checks_test.py | 429 ++++++++++++++---- 7 files changed, 817 insertions(+), 361 deletions(-) create mode 100644 qermit/coherent_pauli_checks/box_clifford_subcircuits.py delete mode 100644 qermit/coherent_pauli_checks/clifford_detect.py create mode 100644 qermit/coherent_pauli_checks/monochromatic_convex_subdag.py diff --git a/docs/coherent_pauli_checks.rst b/docs/coherent_pauli_checks.rst index 39102b0c..80c488c8 100644 --- a/docs/coherent_pauli_checks.rst +++ b/docs/coherent_pauli_checks.rst @@ -10,4 +10,8 @@ qermit.coherent_pauli_checks .. autoclass:: qermit.coherent_pauli_checks.pauli_sampler.RandomPauliSampler :members: - :show-inheritance: \ No newline at end of file + :show-inheritance: + +.. autofunction:: qermit.coherent_pauli_checks.monochromatic_convex_subdag.get_monochromatic_convex_subdag + +.. autofunction:: qermit.coherent_pauli_checks.box_clifford_subcircuits.BoxClifford \ No newline at end of file diff --git a/qermit/coherent_pauli_checks/__init__.py b/qermit/coherent_pauli_checks/__init__.py index 9deeebb7..faa181b6 100644 --- a/qermit/coherent_pauli_checks/__init__.py +++ b/qermit/coherent_pauli_checks/__init__.py @@ -1,9 +1,5 @@ -from .clifford_detect import ( # noqa:F401 - QermitDAGCircuit, - cpc_rebase_pass, -) from .coherent_pauli_checks_mitres import gen_coherent_pauli_check_mitres -from .pauli_sampler import ( # noqa:F401 +from .pauli_sampler import ( DeterministicXPauliSampler, DeterministicZPauliSampler, OptimalPauliSampler, diff --git a/qermit/coherent_pauli_checks/box_clifford_subcircuits.py b/qermit/coherent_pauli_checks/box_clifford_subcircuits.py new file mode 100644 index 00000000..093c90b1 --- /dev/null +++ b/qermit/coherent_pauli_checks/box_clifford_subcircuits.py @@ -0,0 +1,316 @@ +import networkx as nx # type: ignore +from pytket._tket.unit_id import Qubit +from pytket.circuit import CircBox, Circuit, Command, OpType +from pytket.passes import BasePass, CustomPass + +from .monochromatic_convex_subdag import get_monochromatic_convex_subdag + + +def _command_is_clifford(command: Command) -> bool: + """Check if the given command is Clifford. + + :param command: Command to check. + :return: Boolean value indicating if given command is Clifford. + """ + + # This is only a limited set of gates. + # TODO: This should be expanded. + + if command.op.is_clifford_type(): + return True + + if command.op.type == OpType.Rz: + if command.op.params == [0.5]: + return True + + if command.op.type == OpType.PhasedX: + if command.op.params == [0.5, 0.5]: + return True + + return False + + +def _get_clifford_commands(command_list: list[Command]) -> list[int]: + """Given a list of commands, return a set of indexes of that list + corresponding to those commands which are Clifford gates. + + :param command_list: List of commands in which to search for + Clifford commends. + :return: Indexes in the list which correspond to commands in the + list which are Clifford. + """ + return [ + i for i, command in enumerate(command_list) if _command_is_clifford(command) + ] + + +def _give_nodes_subdag(dag: nx.DiGraph, node_subdag: dict[int, int]) -> list[int]: + """Assign a sub-DAG to all nodes in given dag. Some may already have + an assigned sub-DAG as given and these are preserved. Nodes without an + assigned sub-DAG are given a unique sub-DAG of their own. + + :param dag: Directed acyclic graph. + :param node_subdag: Map from node to sub-DAG for those nodes with + an existing assignment. + :raises Exception: Raised if nodes are not sequential integers. + :return: List of sub-DAGs. List is indexed by node. + """ + + if not sorted(list(dag.nodes)) == [i for i in range(dag.number_of_nodes())]: + raise Exception("The nodes of the given dag must be sequential integers.") + + node_subdag_list = [] + subdag_index = 0 + for node in range(dag.number_of_nodes()): + if node in node_subdag.keys(): + node_subdag_list.append(node_subdag[node]) + else: + while (subdag_index in node_subdag.values()) or ( + subdag_index in node_subdag_list + ): + subdag_index += 1 + node_subdag_list.append(subdag_index) + + return node_subdag_list + + +def _circuit_to_graph(circuit: Circuit) -> tuple[nx.DiGraph, list[Command]]: + """Convert circuit to graph. Nodes correspond to commands, + edges indicate a dependence between the outputs and inputs of + two commands. Node values corresponds to indexes in the returned + list of commands. + + :param circuit: Circuit to convert to a graph. + :return: Tuple of graph and list of commands. Nodes are indexes + in the list of commands. + """ + # Lists the most recent node to act on a particular qubits. If a + # new gate is found to act on that qubit then an edge between the + # node which corresponds to the new gate and the node which + # most recently acted on that qubit is added. This builds a DAG + # showing the temporal order of gates. + node_command = circuit.get_commands() + current_node: dict[Qubit, int] = {} + dag = nx.DiGraph() + for node, command in enumerate(node_command): + dag.add_node(node) + for qubit in command.qubits: + # This if statement is used in case the qubit has not been + # acted on yet. + if qubit in current_node.keys(): + dag.add_edge(current_node[qubit], node) + current_node[qubit] = node + + return dag, node_command + + +def _get_sub_circuit_qubits( + command_list: list[Command], + command_subcircuit: list[int], +) -> dict[int, set[Qubit]]: + """For each subcircuit, get the qubits on which it acts. + + :param command_list: A list of commands. + :param command_subcircuit: The subcircuit to which each command belongs. + :return: A map from the subcircuit to the qubits it act on. + """ + sub_circuit_list = list(set(command_subcircuit)) + sub_circuit_qubits: dict[int, set[Qubit]] = { + sub_circuit: set() for sub_circuit in sub_circuit_list + } + for node, sub_circuit in enumerate(command_subcircuit): + for qubit in command_list[node].qubits: + sub_circuit_qubits[sub_circuit].add(qubit) + + return sub_circuit_qubits + + +def _can_implement( + sub_circuit: int, + command_sub_circuit: list[int], + command_implemented: list[bool], + dag: nx.DiGraph, + node_command: list[Command], +) -> bool: + """True if it is safe to implement a subcircuit. False otherwise. + This will be true if all predecessors of commands in the sub circuit + have been implemented. + + :param sub_circuit: Subcircuit to check. + :param command_sub_circuit: The subcircuit of each command. + :param command_implemented: List with entry for each command indicating if + it has been implemented. + :param dag: Graph giving dependencies between commands. + :param node_command: Command corresponding to each node in the graph. + :return: True if it is safe to implement a subcircuit. False otherwise. + """ + _can_implement = True + for node in range(len(node_command)): + if not command_sub_circuit[node] == sub_circuit: + continue + + for predecessor in dag.predecessors(node): + if command_sub_circuit[predecessor] == sub_circuit: + continue + if not command_implemented[predecessor]: + _can_implement = False + + return _can_implement + + +def _box_clifford_transform(circuit: Circuit) -> Circuit: + """Replace Clifford subcircuits with boxes containing those circuits. + These boxes will have the name "Clifford Subcircuit". + + :param circuit: Circuit whose Clifford subcircuits should be boxed. + :return: Equivalent circuit with subcircuits boxed. + :rtype: Circuit + """ + dag, node_command = _circuit_to_graph(circuit=circuit) + clifford_nodes = _get_clifford_commands(node_command) + + node_subdag = get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=clifford_nodes, + ) + + node_sub_circuit_list = _give_nodes_subdag(dag=dag, node_subdag=node_subdag) + + sub_circuit_qubits = _get_sub_circuit_qubits( + command_list=node_command, + command_subcircuit=node_sub_circuit_list, + ) + + # List indicating if a command has been implemented + implemented_commands = [False] * len(node_command) + + # Initialise new circuit + clifford_box_circuit = Circuit() + for qubit in circuit.qubits: + clifford_box_circuit.add_qubit(qubit) + for bit in circuit.bits: + clifford_box_circuit.add_bit(bit) + clifford_box_circuit.add_phase(circuit.phase) + + while not all(implemented_commands): + # Search for a subcircuit that it is safe to implement, and + # pick the first one found to be implemented. + not_implemented = [ + node_sub_circuit + for node_sub_circuit, implemented in zip( + node_sub_circuit_list, implemented_commands + ) + if not implemented + ] + sub_circuit_to_implement = None + for sub_circuit in set(not_implemented): + if _can_implement( + sub_circuit=sub_circuit, + command_sub_circuit=node_sub_circuit_list, + command_implemented=implemented_commands, + dag=dag, + node_command=node_command, + ): + sub_circuit_to_implement = sub_circuit + break + + assert sub_circuit_to_implement is not None + + # List the nodes in the chosen sub circuit + node_to_implement_list = [ + node + for node in range(len(node_command)) + if node_sub_circuit_list[node] == sub_circuit_to_implement + ] + assert len(node_to_implement_list) > 0 + + # If the circuit is clifford add it as a circbox + if node_to_implement_list[0] in clifford_nodes: + assert all( + node_to_implement in clifford_nodes + for node_to_implement in node_to_implement_list + ) + + # Empty circuit to contain clifford subcircuit + clifford_subcircuit = Circuit( + n_qubits=len(sub_circuit_qubits[sub_circuit_to_implement]), + name="Clifford Subcircuit", + ) + + # Map from qubits in original circuit to qubits in new + # clifford circuit. + qubit_to_index = { + qubit: i + for i, qubit in enumerate(sub_circuit_qubits[sub_circuit_to_implement]) + } + + # Add all gates to new circuit + for node in node_to_implement_list: + # It is assumed that the commands have no classical bits. + if node_command[node].args != node_command[node].qubits: + raise Exception( + "This Clifford subcircuit contains classical bits." + "This is a bug and should be reported to the developers." + ) + + if node_command[node].op.type == OpType.Barrier: + raise Exception( + "This Clifford subcircuit contains a barrier." + "This is a bug and should be reported to the developers." + ) + + if node_command[node].opgroup is not None: + clifford_subcircuit.add_gate( + Op=node_command[node].op, + args=[ + qubit_to_index[qubit] for qubit in node_command[node].qubits + ], + opgroup=node_command[node].opgroup, + ) + + else: + clifford_subcircuit.add_gate( + Op=node_command[node].op, + args=[ + qubit_to_index[qubit] for qubit in node_command[node].qubits + ], + ) + + implemented_commands[node] = True + + clifford_circ_box = CircBox(clifford_subcircuit) + clifford_box_circuit.add_circbox( + clifford_circ_box, + list(sub_circuit_qubits[sub_circuit_to_implement]), + ) + + # Otherwise, add the gates straight to the circuit + else: + assert len(node_to_implement_list) == 1 + + if node_command[node_to_implement_list[0]].op.type == OpType.Barrier: + clifford_box_circuit.add_barrier( + units=node_command[node_to_implement_list[0]].args, + data=node_command[node_to_implement_list[0]].op.data, # type: ignore + ) + + else: + clifford_box_circuit.add_gate( + node_command[node_to_implement_list[0]].op, + node_command[node_to_implement_list[0]].args, + ) + + implemented_commands[node_to_implement_list[0]] = True + + return clifford_box_circuit + + +def BoxClifford() -> BasePass: + """ + Pass finding clifford subcircuits and wrapping them + in circuit boxed called "Clifford Subcircuit". + + :return: Pass finding clifford subcircuits and wrapping them + in circuit boxed called "Clifford Subcircuit". + """ + return CustomPass(transform=_box_clifford_transform) diff --git a/qermit/coherent_pauli_checks/clifford_detect.py b/qermit/coherent_pauli_checks/clifford_detect.py deleted file mode 100644 index 693e21a7..00000000 --- a/qermit/coherent_pauli_checks/clifford_detect.py +++ /dev/null @@ -1,270 +0,0 @@ -import math -from typing import List, Optional, Union, cast - -import networkx as nx # type: ignore -from pytket import Circuit, OpType, Qubit -from pytket.circuit import CircBox, Command -from pytket.passes import AutoRebase - -clifford_ops = [OpType.CZ, OpType.H, OpType.Z, OpType.S, OpType.X] -non_clifford_ops = [OpType.Rz] - -cpc_rebase_pass = AutoRebase(gateset=set(clifford_ops + non_clifford_ops)) - - -class DAGCommand: - def __init__(self, command: Command) -> None: - self.command = command - - @property - def clifford(self) -> bool: - if self.command.op.is_clifford_type(): - return True - - if self.command.op.type in [OpType.PhasedX, OpType.Rz]: - return all( - math.isclose(param % 0.5, 0) or math.isclose(param % 0.5, 0.5) - for param in self.command.op.params - ) - - return False - - -class QermitDAGCircuit(nx.DiGraph): - def __init__(self, circuit: Circuit, cutoff: Optional[int] = None) -> None: - # TODO: There are other things to be saved, like the phase. - - super().__init__() - - self.node_command = [DAGCommand(command) for command in circuit.get_commands()] - self.qubits = circuit.qubits - self.bits = circuit.bits - self.cutoff = cutoff - - # Lists the most recent node to act on a particular qubits. If a - # new gate is found to act on that qubit then an edge between the - # node which corresponds to the new gate and the node which - # most recently acted on that qubit is added. This builds a DAG - # showing the temporal order of gates. - current_node: dict[Qubit, int] = {} - for node, command in enumerate(self.node_command): - self.add_node(node) - for qubit in command.command.qubits: - # This if statement is used in case the qubit has not been - # acted on yet. - if qubit in current_node.keys(): - self.add_edge(current_node[qubit], node) - current_node[qubit] = node - - def get_clifford_subcircuits(self) -> List[int]: - # a list indicating the clifford subcircuit to which a command belongs. - node_sub_circuit: List[Union[int, None]] = [None] * self.number_of_nodes() - next_sub_circuit_id = 0 - - # Iterate through all commands and check if their neighbours should - # be added to the same clifford subcircuit. - for node, command in enumerate(self.node_command): - # If the command is not in a clifford sub circuit, start a - # new one and add it to that new one, - if node_sub_circuit[node] is None: - node_sub_circuit[node] = next_sub_circuit_id - next_sub_circuit_id += 1 - - # Ignore the command if it is not clifford - if not command.clifford: - continue - - # For all the neighbours of the command being considered, add that - # neighbour to the same clifford subcircuit if no non-clifford - # gates prevent this from happening. - for neighbour_id in self.neighbors(node): - # Do not add the neighbour if it is not a clifford gate. - if not self.node_command[neighbour_id].clifford: - continue - - # Do not add the neighbour if it is already part of a clifford - # sub circuit. - # TODO: This should be removed. In particular we can include - # the current node in the hyperedge if there are no - # non-clifford blocking us from doing so. - if node_sub_circuit[neighbour_id] is not None: - continue - - # list all of the commands in the circuit which belong to - # the same sub circuit as the one being considered - same_sub_circuit_node_list = [ - i - for i, sub_circuit in enumerate(node_sub_circuit) - if sub_circuit == node_sub_circuit[node] - ] - - # Check if any of the paths in the circuit from the neighbour - # to other commands in the clifford circuit pass through - # a different Clifford subcircuit. If nodes on the path - # belonged to another clifford subcircuit then it would - # not be possible to build the circuit by applying - # sub circuits sequentially. - same_clifford_circuit = True - for same_sub_circuit_node in same_sub_circuit_node_list: - # I'm allowing to pass cutoff, but that should - # not be allowed. In particular paths of arbitrary - # lengths should be checked in practice. - # however all_simple_paths is quite slow otherwise as it - # spends a lot of time looking for paths that don't exist. - for path in nx.all_simple_paths( - self, same_sub_circuit_node, neighbour_id, cutoff=self.cutoff - ): - if not all( - node_sub_circuit[path_node] == node_sub_circuit[node] - for path_node in path[:-1] - ): - same_clifford_circuit = False - break - - # add the neighbour if no paths in the circuit to other - # commands in the clifford sub circuit pass through - # non clifford sub circuits. - if same_clifford_circuit: - node_sub_circuit[neighbour_id] = node_sub_circuit[node] - - if any( - sub_circuit is None for sub_circuit in node_sub_circuit - ): # pragma: no cover - raise Exception("Some nodes have been left unassigned.") - - return cast(List[int], node_sub_circuit) - - # TODO: I'm not sure if this should return a circuit, or changes this - # QermitDagCircuit in place - def to_clifford_subcircuit_boxes(self) -> Circuit: - # TODO: It could be worth insisting that the given circuit does not - # include any boxes called 'Clifford Subcircuit'. i.e. that the - # circuit is 'clean'. - - node_sub_circuit_list = self.get_clifford_subcircuits() - sub_circuit_qubits = self.get_sub_circuit_qubits(node_sub_circuit_list) - - # List indicating if a command has been implemented - implemented_commands = [False for _ in self.nodes()] - - # Initialise new circuit - clifford_box_circuit = Circuit() - for qubit in self.qubits: - clifford_box_circuit.add_qubit(qubit) - for bit in self.bits: - clifford_box_circuit.add_bit(bit) - - while not all(implemented_commands): - # Search for a subcircuit that it is safe to implement, and - # pick the first one found to be implemented. - not_implemented = [ - node_sub_circuit - for node_sub_circuit, implemented in zip( - node_sub_circuit_list, implemented_commands - ) - if not implemented - ] - sub_circuit_to_implement = None - for sub_circuit in set(not_implemented): - if self.can_implement( - sub_circuit, node_sub_circuit_list, implemented_commands - ): - sub_circuit_to_implement = sub_circuit - break - assert sub_circuit_to_implement is not None - - # List the nodes in the chosen sub circuit - node_to_implement_list = [ - node - for node in self.nodes() - if node_sub_circuit_list[node] == sub_circuit_to_implement - ] - assert len(node_to_implement_list) > 0 - - # If the circuit is clifford add it as a circbox - if self.node_command[node_to_implement_list[0]].clifford: - # Empty circuit to contain clifford subcircuit - clifford_subcircuit = Circuit( - n_qubits=len(sub_circuit_qubits[sub_circuit_to_implement]), - name="Clifford Subcircuit", - ) - - # Map from qubits in original circuit to qubits in new - # clifford circuit. - qubit_to_index = { - qubit: i - for i, qubit in enumerate( - sub_circuit_qubits[sub_circuit_to_implement] - ) - } - - # Add all gates to new circuit - for node in node_to_implement_list: - clifford_subcircuit.add_gate( - self.node_command[node].command.op, - [ - qubit_to_index[qubit] - for qubit in self.node_command[node].command.args - ], - ) - implemented_commands[node] = True - - clifford_circ_box = CircBox(clifford_subcircuit) - clifford_box_circuit.add_circbox( - clifford_circ_box, - list(sub_circuit_qubits[sub_circuit_to_implement]), - ) - - # Otherwise, add the gates straight to the circuit - else: - for node in node_to_implement_list: - clifford_box_circuit.add_gate( - self.node_command[node].command.op, - self.node_command[node].command.args, - ) - implemented_commands[node] = True - - return clifford_box_circuit - - def get_sub_circuit_qubits( - self, node_sub_circuit: list[int] - ) -> dict[int, set[Qubit]]: - """Creates a dictionary from the clifford sub circuit to the qubits - which it covers. - - :param node_sub_circuit: List identifying to which clifford sub circuit - each command belongs. - :type node_sub_circuit: List[Int] - :return: Dictionary from clifford sub circuit index to the qubits - the circuit covers. - :rtype: Dict[Int, List[Quibt]] - """ - - sub_circuit_list = list(set(node_sub_circuit)) - sub_circuit_qubits: dict[int, set[Qubit]] = { - sub_circuit: set() for sub_circuit in sub_circuit_list - } - for node, sub_circuit in enumerate(node_sub_circuit): - for qubit in self.node_command[node].command.qubits: - sub_circuit_qubits[sub_circuit].add(qubit) - - return sub_circuit_qubits - - def can_implement( - self, - sub_circuit: int, - node_sub_circuit_list: List[int], - implemented_commands: List[bool], - ) -> bool: - can_implement = True - for node in self.nodes: - if not node_sub_circuit_list[node] == sub_circuit: - continue - - for predecessor in self.predecessors(node): - if node_sub_circuit_list[predecessor] == sub_circuit: - continue - if not implemented_commands[predecessor]: - can_implement = False - - return can_implement diff --git a/qermit/coherent_pauli_checks/coherent_pauli_checks_mitres.py b/qermit/coherent_pauli_checks/coherent_pauli_checks_mitres.py index b87b2456..c27b8cab 100644 --- a/qermit/coherent_pauli_checks/coherent_pauli_checks_mitres.py +++ b/qermit/coherent_pauli_checks/coherent_pauli_checks_mitres.py @@ -4,7 +4,7 @@ from pytket.passes import DecomposeBoxes from qermit import CircuitShots, MitRes, MitTask, TaskGraph -from qermit.coherent_pauli_checks.clifford_detect import QermitDAGCircuit +from qermit.coherent_pauli_checks.box_clifford_subcircuits import BoxClifford from qermit.postselection.postselect_manager import PostselectMgr from qermit.postselection.postselect_mitres import gen_postselect_task @@ -29,8 +29,8 @@ def task(_, circ_shots_list: List[CircuitShots]) -> Tuple[List[CircuitShots]]: cliff_circ_shots_list = [] for circ_shots in circ_shots_list: - dag_circuit = QermitDAGCircuit(circuit=circ_shots.Circuit) - cliff_circ = dag_circuit.to_clifford_subcircuit_boxes() + cliff_circ = circ_shots.Circuit.copy() + BoxClifford().apply(cliff_circ) cliff_circ_shots_list.append( CircuitShots( Circuit=cliff_circ, diff --git a/qermit/coherent_pauli_checks/monochromatic_convex_subdag.py b/qermit/coherent_pauli_checks/monochromatic_convex_subdag.py new file mode 100644 index 00000000..f93803f3 --- /dev/null +++ b/qermit/coherent_pauli_checks/monochromatic_convex_subdag.py @@ -0,0 +1,145 @@ +from itertools import combinations +from typing import Any + +import networkx as nx # type: ignore + + +def _subdag_nodes(subdag: int, node_subdag: dict[Any, int]) -> list[Any]: + """Get all nodes in a given sub-DAG. + + :param subdag: The sub-DAG whose nodes should be retrieved. + :param node_subdag: Map from node to the sub-DAG to which the + node belongs. + :return: List of nodes belonging to given sub-DAG. + """ + return [node for node, s in node_subdag.items() if s == subdag] + + +def _subdag_predecessors( + dag: nx.DiGraph, subdag: int, node_subdag: dict[Any, int] +) -> list[Any]: + """Retrieve all nodes not in given sub-DAG with successors in sub-DAG. + + :param dag: Directed Acyclic Graph. + :param subdag: Sub-DAG to retrieve predecessors of. + :param node_subdag: Map from node to the sub-DAG to which it belongs. + :return: Nodes with successors in given sub-DAG. + """ + subdag_nodes = _subdag_nodes(subdag=subdag, node_subdag=node_subdag) + return sum( + [ + [ + predecessor + for predecessor in dag.predecessors(node) + # Exclude nodes in subdag. + if predecessor not in subdag_nodes + ] + for node in subdag_nodes + ], + start=[], + ) + + +def _subdag_successors( + dag: nx.DiGraph, subdag: int, node_subdag: dict[Any, int] +) -> list[Any]: + """Retrieve all nodes not in given sub-DAG with predecessors in sub-DAG. + + :param dag: Directed Acyclic Graph. + :param subdag: Sub-DAG to retrieve successors of. + :param node_subdag: Map from node to the sub-DAG to which it belongs. + :return: Nodes with predecessors in given sub-DAG. + """ + subdag_nodes = _subdag_nodes(subdag=subdag, node_subdag=node_subdag) + return sum( + [ + [ + successor + for successor in dag.successors(node) + # Exclude nodes in subdag. + if successor not in subdag_nodes + ] + for node in subdag_nodes + ], + start=[], + ) + + +def get_monochromatic_convex_subdag( + dag: nx.DiGraph, coloured_nodes: list[Any] +) -> dict[Any, int]: + """Retrieve assignment of coloured nodes to sub-DAGs. + The assignment aims to minimise the number of sub-DAGs. + + :param dag: Directed Acyclic Graph. + :param coloured_nodes: The nodes which are coloured. + :return: Map from node to the sub-DAG to which it belongs. + """ + + node_descendants = {node: nx.descendants(dag, node) for node in dag.nodes} + for node in node_descendants.keys(): + node_descendants[node].add(node) + + def _can_merge( + dag: nx.DiGraph, + subdag_one: int, + subdag_two: int, + node_subdag: dict[Any, int], + ) -> bool: + """Determine if two sub-DAGs can be merged. This will be the case if + there are no paths between predecessors of one sub-DAG to successors of + the other. + + :param dag: Directed Acyclic Graph. + :param subdag_one: First sub-DAG. + :param subdag_two: Second sub-DAG. + :param node_subdag: Map from node to the sub-DAG to which it belongs. + :return: Boolean value indicating if the two sub-DAGs can be merged. + """ + + subdag_two_pred = _subdag_predecessors( + dag=dag, subdag=subdag_two, node_subdag=node_subdag + ) + for subdag_one_succ in _subdag_successors( + dag=dag, subdag=subdag_one, node_subdag=node_subdag + ): + if any( + descendant in subdag_two_pred + for descendant in node_descendants[subdag_one_succ] + ): + return False + + subgraph_one_pred = _subdag_predecessors( + dag=dag, subdag=subdag_one, node_subdag=node_subdag + ) + for subdag_two_succ in _subdag_successors( + dag=dag, subdag=subdag_two, node_subdag=node_subdag + ): + if any( + descendant in subgraph_one_pred + for descendant in node_descendants[subdag_two_succ] + ): + return False + + return True + + node_subdag = {node: i for i, node in enumerate(coloured_nodes)} + + subgraph_merged = True + while subgraph_merged: + subgraph_merged = False + + # Try to merge all pairs of sub-DAGs + for subdag_one, subdag_two in combinations(set(node_subdag.values()), 2): + if _can_merge( + dag=dag, + subdag_one=subdag_one, + subdag_two=subdag_two, + node_subdag=node_subdag, + ): + for node in _subdag_nodes(subdag=subdag_two, node_subdag=node_subdag): + node_subdag[node] = subdag_one + subgraph_merged = True + break + + return node_subdag diff --git a/tests/coherent_pauli_checks_test.py b/tests/coherent_pauli_checks_test.py index 0e9c29b6..b2cc1777 100644 --- a/tests/coherent_pauli_checks_test.py +++ b/tests/coherent_pauli_checks_test.py @@ -1,5 +1,6 @@ from collections import Counter +import networkx as nx import numpy as np import numpy.random import pytest @@ -15,11 +16,21 @@ DeterministicZPauliSampler, OptimalPauliSampler, PauliSampler, - QermitDAGCircuit, RandomPauliSampler, - cpc_rebase_pass, gen_coherent_pauli_check_mitres, ) +from qermit.coherent_pauli_checks.box_clifford_subcircuits import ( + BoxClifford, + _circuit_to_graph, + _command_is_clifford, + _get_clifford_commands, + _give_nodes_subdag, +) +from qermit.coherent_pauli_checks.monochromatic_convex_subdag import ( + _subdag_predecessors, + _subdag_successors, + get_monochromatic_convex_subdag, +) from qermit.noise_model import ( Direction, ErrorDistribution, @@ -35,6 +46,205 @@ ) +def test_boxing_barrier_and_circbox(): + circuit = Circuit(3, 3) + circuit.X(0) + circuit.Z(1) + + circuit.CCX(0, 2, 1) + + circ_box_circuit = Circuit(3).X(0).Z(1).Y(2) + circ_box = CircBox(circ_box_circuit) + circuit.add_circbox( + circ_box, + circuit.qubits, + ) + + circuit.X(2, condition=circuit.bits[0] & circuit.bits[1]) + + circuit.add_barrier( + [circuit.qubits[0]] + [circuit.qubits[2]] + circuit.bits, + data="my data", + ) + circuit.Y(2) + + circ_box_circuit = Circuit(2).X(0).Rx(0.1, 0) + circ_box = CircBox(circ_box_circuit) + circuit.add_circbox( + circ_box, + [circuit.qubits[0], circuit.qubits[1]], + ) + + circuit.Z(0) + + circuit.add_barrier([circuit.qubits[1]] + circuit.bits) + + circuit.measure_all() + + BoxClifford().apply(circuit) + + ideal_circuit = Circuit(3, 3) + + circ_box_circuit = Circuit(2, name="Clifford Subcircuit").X(1).Z(0) + circ_box = CircBox(circ_box_circuit) + ideal_circuit.add_circbox( + circ_box, + [ideal_circuit.qubits[1], ideal_circuit.qubits[0]], + ) + + ideal_circuit.CCX(0, 2, 1) + + circ_box_circuit = Circuit(3).X(0).Z(1).Y(2) + circ_box = CircBox(circ_box_circuit) + ideal_circuit.add_circbox( + circ_box, + ideal_circuit.qubits, + ) + + ideal_circuit.X(2, condition=ideal_circuit.bits[0] & ideal_circuit.bits[1]) + + ideal_circuit.add_barrier( + [ideal_circuit.qubits[0]] + [ideal_circuit.qubits[2]] + ideal_circuit.bits, + data="my data", + ) + + circ_box_circuit = Circuit(2).X(0).Rx(0.1, 0) + circ_box = CircBox(circ_box_circuit) + ideal_circuit.add_circbox( + circ_box, + [ideal_circuit.qubits[0], ideal_circuit.qubits[1]], + ) + + circ_box_circuit = Circuit(2, name="Clifford Subcircuit").Z(1).Y(0) + circ_box = CircBox(circ_box_circuit) + ideal_circuit.add_circbox( + circ_box, + [ideal_circuit.qubits[2], ideal_circuit.qubits[0]], + ) + + ideal_circuit.add_barrier([ideal_circuit.qubits[1]] + ideal_circuit.bits) + + ideal_circuit.measure_all() + + assert ideal_circuit == circuit + + +def test__command_is_clifford(): + qubit_0 = Qubit(name="my_qubit_0", index=0) + qubit_1 = Qubit(name="my_qubit_0", index=2) + qubit_2 = Qubit(name="my_qubit_1", index=0) + qubit_3 = Qubit(name="my_qubit_1", index=1) + + bit_0 = Bit(name="my_bit_0", index=0) + bit_1 = Bit(name="my_bit_0", index=2) + bit_2 = Bit(name="my_bit_1", index=0) + + circuit = Circuit() + + circuit.add_qubit(qubit_0) + circuit.add_qubit(qubit_1) + circuit.add_qubit(qubit_2) + circuit.add_qubit(qubit_3) + + circuit.add_bit(bit_0) + circuit.add_bit(bit_1) + circuit.add_bit(bit_2) + + circuit.CZ(qubit_0, qubit_1) + circuit.X(qubit_2, condition=bit_2) + circuit.Y(qubit_3) + circuit.PhasedX(0.5, 0.5, qubit_1) + circuit.CX(qubit_2, qubit_0, condition=bit_1) + circuit.add_c_and(arg0_in=bit_0, arg1_in=bit_2, arg_out=bit_1) + circuit.Z(qubit_1, condition=bit_0) + circuit.Rz(0.5, qubit_2) + circuit.PhasedX(0.5, 0.5, qubit_2, condition=bit_0) + circuit.Rz(0.5, qubit_2, condition=bit_2) + circuit.H(qubit_1) + circuit.Rz(0.55, qubit_2) + circuit.PhasedX(0.5, 0.55, qubit_2) + + command_list = circuit.get_commands() + + assert command_list[0].op.type == OpType.CZ + assert _command_is_clifford(command_list[0]) + + assert command_list[1].op.type == OpType.Conditional + assert not _command_is_clifford(command_list[1]) + + assert command_list[2].op.type == OpType.Y + assert _command_is_clifford(command_list[2]) + + assert command_list[3].op.type == OpType.Conditional + assert not _command_is_clifford(command_list[3]) + + assert command_list[4].op.type == OpType.PhasedX + assert _command_is_clifford(command_list[4]) + + assert command_list[5].op.type == OpType.ExplicitPredicate + assert not _command_is_clifford(command_list[5]) + + assert command_list[6].op.type == OpType.Conditional + assert not _command_is_clifford(command_list[6]) + + assert command_list[7].op.type == OpType.Rz + assert _command_is_clifford(command_list[7]) + + assert command_list[8].op.type == OpType.H + assert _command_is_clifford(command_list[8]) + + assert command_list[9].op.type == OpType.Conditional + assert not _command_is_clifford(command_list[9]) + + assert command_list[10].op.type == OpType.Conditional + assert not _command_is_clifford(command_list[10]) + + assert command_list[11].op.type == OpType.Rz + assert not _command_is_clifford(command_list[11]) + + assert command_list[12].op.type == OpType.PhasedX + assert not _command_is_clifford(command_list[12]) + + +def test_monochromatic_convex_subdag(): + dag = nx.DiGraph() + dag.add_edges_from([(1, 2), (1, 3), (2, 4)]) + + node_descendants = {node: nx.descendants(dag, node) for node in dag.nodes} + for node in node_descendants.keys(): + node_descendants[node].add(node) + + assert _subdag_successors( + dag=dag, subdag=0, node_subdag={1: 0, 2: 0, 3: 0, 4: 3} + ) == [4] + assert ( + _subdag_successors(dag=dag, subdag=3, node_subdag={1: 0, 2: 0, 3: 0, 4: 3}) + == [] + ) + assert ( + _subdag_predecessors(dag=dag, subdag=0, node_subdag={1: 0, 2: 0, 3: 0, 4: 3}) + == [] + ) + assert _subdag_predecessors( + dag=dag, subdag=3, node_subdag={1: 0, 2: 0, 3: 0, 4: 3} + ) == [2] + + assert get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=[1, 2], + ) == {1: 0, 2: 0} + + assert get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=[1, 4], + ) == {1: 0, 4: 1} + + assert get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=[2, 3], + ) == {2: 0, 3: 0} + + def test_two_clifford_boxes() -> None: cx_error_distribution = ErrorDistribution( rng=np.random.default_rng(), @@ -273,14 +483,37 @@ def test_decompose_clifford_subcircuit_box(): assert dag_circ == ideal_circ -def test_get_clifford_subcircuits(): +def test__give_nodes_subdag(): circ = Circuit(3).CZ(0, 1).H(1).Z(1).CZ(1, 0) - cliff_circ = QermitDAGCircuit(circ) - assert cliff_circ.get_clifford_subcircuits() == [0, 0, 0, 0] + dag, node_command = _circuit_to_graph(circuit=circ) + clifford_nodes = _get_clifford_commands(node_command) + + node_subdag = get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=clifford_nodes, + ) + + assert _give_nodes_subdag(dag=dag, node_subdag=node_subdag) == [0, 0, 0, 0] circ = Circuit(3).CZ(1, 2).H(2).Z(1).CZ(0, 1).H(1).CZ(1, 0).Z(1).CZ(1, 2) - cliff_circ = QermitDAGCircuit(circ) - assert cliff_circ.get_clifford_subcircuits() == [0, 0, 0, 0, 0, 0, 0, 0] + dag, node_command = _circuit_to_graph(circuit=circ) + clifford_nodes = _get_clifford_commands(node_command) + + node_subdag = get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=clifford_nodes, + ) + + assert _give_nodes_subdag(dag=dag, node_subdag=node_subdag) == [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ] circ = ( Circuit(3) @@ -294,15 +527,37 @@ def test_get_clifford_subcircuits(): .Z(1) .CZ(1, 2) ) - cliff_circ = QermitDAGCircuit(circ) - assert cliff_circ.get_clifford_subcircuits() == [0, 1, 0, 2, 2, 3, 4, 4, 4] + dag, node_command = _circuit_to_graph(circuit=circ) + clifford_nodes = _get_clifford_commands(node_command) + + node_subdag = get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=clifford_nodes, + ) + + assert _give_nodes_subdag(dag=dag, node_subdag=node_subdag) == [ + 0, + 1, + 0, + 2, + 2, + 3, + 4, + 4, + 4, + ] def test_add_pauli_checks(): - circ = Circuit(3).H(1).CX(1, 0) - cpc_rebase_pass.apply(circ) - cliff_circ = QermitDAGCircuit(circ) - boxed_circ = cliff_circ.to_clifford_subcircuit_boxes() + original_circuit = Circuit(3).H(1).H(0).CZ(1, 0).H(0) + boxed_circ = original_circuit.copy() + BoxClifford().apply(boxed_circ) + + boxed_circ_copy = boxed_circ.copy() + + DecomposeBoxes().apply(boxed_circ_copy) + assert boxed_circ_copy == original_circuit + circuit, _ = DeterministicZPauliSampler().add_pauli_checks_to_circbox( circuit=boxed_circ, ) @@ -311,73 +566,67 @@ def test_add_pauli_checks(): ideal_circ = Circuit(3) ancilla_0 = Qubit(name="ancilla", index=0) - ancilla_1 = Qubit(name="ancilla", index=1) - - ancilla_measure_0 = Bit(name="ancilla_measure", index=0) - ancilla_measure_1 = Bit(name="ancilla_measure", index=1) - ideal_circ.add_qubit(ancilla_0) - ideal_circ.add_qubit(ancilla_1) + ancilla_measure_0 = Bit(name="ancilla_measure", index=0) ideal_circ.add_bit(id=ancilla_measure_0) - ideal_circ.add_bit(id=ancilla_measure_1) - ideal_circ.add_barrier([ideal_circ.qubits[3], ancilla_0]) + ideal_circ.add_barrier([ideal_circ.qubits[2], ideal_circ.qubits[1], ancilla_0]) ideal_circ.H(ancilla_0, opgroup="ancilla superposition") ideal_circ.CZ( control_qubit=ancilla_0, - target_qubit=ideal_circ.qubits[3], + target_qubit=ideal_circ.qubits[1], opgroup="pauli check", ) - ideal_circ.add_barrier([ideal_circ.qubits[3], ancilla_0]) - - ideal_circ.H(ideal_circ.qubits[3]) - - ideal_circ.add_barrier([ideal_circ.qubits[3], ancilla_0]) - ideal_circ.CX( + ideal_circ.CZ( control_qubit=ancilla_0, - target_qubit=ideal_circ.qubits[3], + target_qubit=ideal_circ.qubits[2], opgroup="pauli check", ) - ideal_circ.H(ancilla_0, opgroup="ancilla superposition") - ideal_circ.add_barrier([ideal_circ.qubits[3], ancilla_0]) + ideal_circ.add_barrier([ideal_circ.qubits[2], ideal_circ.qubits[1], ancilla_0]) + + ideal_circ.H(ideal_circ.qubits[2]) + + ideal_circ.H(ideal_circ.qubits[1]) + ideal_circ.CZ(control_qubit=ideal_circ.qubits[2], target_qubit=ideal_circ.qubits[1]) + ideal_circ.H(ideal_circ.qubits[1]) - ideal_circ.add_barrier([ideal_circ.qubits[3], ideal_circ.qubits[2], ancilla_1]) - ideal_circ.H(ancilla_1, opgroup="ancilla superposition") + ideal_circ.add_barrier([ideal_circ.qubits[2], ideal_circ.qubits[1], ancilla_0]) ideal_circ.CZ( - control_qubit=ancilla_1, - target_qubit=ideal_circ.qubits[2], + control_qubit=ancilla_0, + target_qubit=ideal_circ.qubits[1], opgroup="pauli check", ) - ideal_circ.CZ( - control_qubit=ancilla_1, - target_qubit=ideal_circ.qubits[3], + ideal_circ.CX( + control_qubit=ancilla_0, + target_qubit=ideal_circ.qubits[1], opgroup="pauli check", ) - ideal_circ.add_barrier([ideal_circ.qubits[3], ideal_circ.qubits[2], ancilla_1]) - - ideal_circ.H(ideal_circ.qubits[2]) - ideal_circ.CZ(control_qubit=ideal_circ.qubits[3], target_qubit=ideal_circ.qubits[2]) - ideal_circ.H(ideal_circ.qubits[2]) - - ideal_circ.add_barrier([ideal_circ.qubits[3], ideal_circ.qubits[2], ancilla_1]) ideal_circ.CZ( - control_qubit=ancilla_1, + control_qubit=ancilla_0, target_qubit=ideal_circ.qubits[2], opgroup="pauli check", ) - ideal_circ.H(ancilla_1, opgroup="ancilla superposition") - ideal_circ.add_barrier([ideal_circ.qubits[3], ideal_circ.qubits[2], ancilla_1]) + ideal_circ.CX( + control_qubit=ancilla_0, + target_qubit=ideal_circ.qubits[2], + opgroup="pauli check", + ) + ideal_circ.H(ancilla_0, opgroup="ancilla superposition") + ideal_circ.add_barrier([ideal_circ.qubits[2], ideal_circ.qubits[1], ancilla_0]) ideal_circ.Measure(ancilla_0, ancilla_measure_0) - ideal_circ.Measure(ancilla_1, ancilla_measure_1) assert ideal_circ == circuit - circ = Circuit(2).H(0).CX(1, 0).X(1).CX(1, 0) - cpc_rebase_pass.apply(circ) - cliff_circ = QermitDAGCircuit(circ) - boxed_circ = cliff_circ.to_clifford_subcircuit_boxes() + original_circuit = Circuit(2).H(0).H(0).CZ(1, 0).H(0).X(1).H(0).CZ(1, 0).H(0) + boxed_circ = original_circuit.copy() + BoxClifford().apply(boxed_circ) + + decomposed_boxed_circ = boxed_circ.copy() + DecomposeBoxes().apply(decomposed_boxed_circ) + assert decomposed_boxed_circ == original_circuit + circuit, _ = DeterministicZPauliSampler().add_pauli_checks_to_circbox( circuit=boxed_circ, ) @@ -427,35 +676,33 @@ def test_add_pauli_checks(): assert ideal_circ == circuit -def test_simple_non_minimal_example(): - # Note that this is a simple example of where the current implementation - # is not minimal. The whole think is a relatively easy to identify - # Clifford circuit. - +def test_simple_example(): clifford_circuit = Circuit(3).CZ(0, 1).X(2).X(0).CZ(0, 2).CZ(1, 2) - dag_circuit = QermitDAGCircuit(clifford_circuit) - assert dag_circuit.get_clifford_subcircuits() == [0, 1, 0, 1, 1] + dag, node_command = _circuit_to_graph(circuit=clifford_circuit) + clifford_nodes = _get_clifford_commands(node_command) + + node_subdag = get_monochromatic_convex_subdag( + dag=dag, + coloured_nodes=clifford_nodes, + ) + + assert _give_nodes_subdag(dag=dag, node_subdag=node_subdag) == [0, 0, 0, 0, 0] def test_5q_random_clifford(): rng = numpy.random.default_rng(seed=0) + clifford_circuit = random_clifford_circ(n_qubits=5, rng=rng) - cpc_rebase_pass.apply(clifford_circuit) - dag_circuit = QermitDAGCircuit(clifford_circuit) - boxed_clifford_circuit = dag_circuit.to_clifford_subcircuit_boxes() - pauli_sampler = RandomPauliSampler(rng=rng, n_checks=2) - pauli_sampler.add_pauli_checks_to_circbox(circuit=boxed_clifford_circuit) + boxed_clifford_circuit = clifford_circuit.copy() + BoxClifford().apply(boxed_clifford_circuit) + decomposed_boxed_clifford_circuit = boxed_clifford_circuit.copy() + DecomposeBoxes().apply(decomposed_boxed_clifford_circuit) -@pytest.mark.skip( - reason="This test passes, but the functionality is incorrect. In particular there is a H in the middle which is identified as Clifford but which has no checks added." -) -def test_2q_random_clifford(): - clifford_circuit = random_clifford_circ(n_qubits=5, seed=0) - cpc_rebase_pass.apply(clifford_circuit) - dag_circuit = QermitDAGCircuit(clifford_circuit) - pauli_sampler = RandomPauliSampler(seed=0) - dag_circuit.add_pauli_checks(pauli_sampler=pauli_sampler) + assert decomposed_boxed_clifford_circuit == clifford_circuit + + pauli_sampler = RandomPauliSampler(rng=rng, n_checks=2) + pauli_sampler.add_pauli_checks_to_circbox(circuit=boxed_clifford_circuit) def test_CZ_circuit_with_phase(): @@ -463,8 +710,14 @@ def test_CZ_circuit_with_phase(): # global phase which needs to be bumped to the control. original_circuit = Circuit(2).CZ(0, 1).measure_all() - dag_circuit = QermitDAGCircuit(original_circuit) - boxed_original_circuit = dag_circuit.to_clifford_subcircuit_boxes() + boxed_original_circuit = original_circuit.copy() + BoxClifford().apply(boxed_original_circuit) + + decomposed_boxed_original_circuit = boxed_original_circuit.copy() + DecomposeBoxes().apply(decomposed_boxed_original_circuit) + + assert decomposed_boxed_original_circuit == original_circuit + pauli_sampler = DeterministicXPauliSampler() pauli_checks_circuit, _ = pauli_sampler.add_pauli_checks_to_circbox( circuit=boxed_original_circuit, @@ -516,8 +769,10 @@ def test_to_clifford_subcircuits(): .Z(1) .CZ(1, 2) ) - dag_circuit = QermitDAGCircuit(orig_circuit) - clifford_box_circuit = dag_circuit.to_clifford_subcircuit_boxes() + # dag_circuit = QermitDAGCircuit(orig_circuit) + # clifford_box_circuit = dag_circuit.to_clifford_subcircuit_boxes() + clifford_box_circuit = orig_circuit.copy() + BoxClifford().apply(clifford_box_circuit) DecomposeBoxes().apply(clifford_box_circuit) assert clifford_box_circuit == orig_circuit @@ -561,8 +816,13 @@ def test_optimal_pauli_sampler(): noise_model=noise_model, n_checks=2, ) - dag_circ = QermitDAGCircuit(cliff_circ) - boxed_cliff_circ = dag_circ.to_clifford_subcircuit_boxes() + boxed_cliff_circ = cliff_circ.copy() + BoxClifford().apply(boxed_cliff_circ) + + decomposed_boxed_cliff_circ = boxed_cliff_circ.copy() + DecomposeBoxes().apply(decomposed_boxed_cliff_circ) + assert decomposed_boxed_cliff_circ == cliff_circ + pauli_sampler.add_pauli_checks_to_circbox(circuit=boxed_cliff_circ) @@ -584,8 +844,13 @@ def sample(self, circ, **kwargs): ) ] - dag_circ = QermitDAGCircuit(cliff_circ) - boxed_cliff_circ = dag_circ.to_clifford_subcircuit_boxes() + boxed_cliff_circ = cliff_circ.copy() + BoxClifford().apply(boxed_cliff_circ) + + decomposed_boxed_cliff_circ = boxed_cliff_circ.copy() + DecomposeBoxes().apply(decomposed_boxed_cliff_circ) + assert decomposed_boxed_cliff_circ == cliff_circ + pauli_sampler = DeterministicPauliSampler() pauli_check_circ, _ = pauli_sampler.add_pauli_checks_to_circbox( circuit=boxed_cliff_circ,