Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement swap update algorithm for adjacency constraint resolution when placing circuits on hardware #146

Merged
merged 7 commits into from
Mar 2, 2021
193 changes: 193 additions & 0 deletions recirq/quantum_chess/mcpe_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# Copyright 2021 Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities related to the maximum consecutive positive effect (mcpe) heuristic
cost function.

These are necessary for implementing the swap-based update algorithm described
in the paper 'A Dynamic Look-Ahead Heuristic for the Qubit Mapping Problem of
NISQ Computers'
(https://ieeexplore.ieee.org/abstract/document/8976109).
"""
from collections import defaultdict, deque
from typing import Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple

import cirq


def manhattan_dist(q1: cirq.GridQubit, q2: cirq.GridQubit) -> int:
"""Returns the Manhattan distance between two GridQubits.

On grid devices this is the shortest path length between the two qubits.
"""
return abs(q1.row - q2.row) + abs(q1.col - q2.col)
weinstein marked this conversation as resolved.
Show resolved Hide resolved


def swap_map_fn(q1: cirq.Qid, q2: cirq.Qid) -> Callable[[cirq.Qid], cirq.Qid]:
"""Returns a function which applies the effect of swapping two qubits."""
swaps = {q1: q2, q2: q1}
return lambda q: swaps.get(q, q)


def effect_of_swap(swap_qubits: Tuple[cirq.GridQubit, cirq.GridQubit],
gate_qubits: Tuple[cirq.GridQubit, cirq.GridQubit]) -> int:
"""Returns the net effect of a swap on the distance between a gate's qubits.

Note that this returns >0 if the distance would decrease and <0 if it would
increase, which is somewhat counter-intuitive.

Args:
swap_qubits: the pair of qubits to swap
gate_qubits: the pair of qubits that the gate operates on
"""
gate_after = map(swap_map_fn(*swap_qubits), gate_qubits)
# TODO(https://github.com/quantumlib/ReCirq/issues/149):
# Using manhattan distance only works for grid devices when all qubits
# are usable (no holes, hanging strips, or disconnected qubits).
# Update this to use the shortest path length computed on the device's
# connectivity graph (ex using the output of Floyd-Warshall).
return manhattan_dist(*gate_qubits) - manhattan_dist(*gate_after)


class QubitMapping:
"""Data structure representing a 1:1 map between logical and physical GridQubits.

Args:
initial_mapping: initial logical-to-physical qubit map.
"""
def __init__(self, initial_mapping: Dict[cirq.Qid, cirq.GridQubit] = {}):
self.logical_to_physical = initial_mapping
self.physical_to_logical = {v: k for k, v in initial_mapping.items()}

def swap_physical(self, q1: cirq.GridQubit, q2: cirq.GridQubit) -> None:
"""Updates the mapping by swapping two physical qubits."""
logical_q1 = self.physical_to_logical.get(q1)
logical_q2 = self.physical_to_logical.get(q2)
self.physical_to_logical[q1], self.physical_to_logical[
q2] = logical_q2, logical_q1
self.logical_to_physical[logical_q1], self.logical_to_physical[
logical_q2] = q2, q1

def logical(self, qubit: cirq.GridQubit) -> cirq.Qid:
"""Returns the logical qubit for a given physical qubit."""
return self.physical_to_logical.get(qubit)

def physical(self, qubit: cirq.Qid) -> cirq.GridQubit:
"""Returns the physical qubit for a given logical qubit."""
return self.logical_to_physical.get(qubit)


class DependencyLists:
"""Data structure representing the interdependencies between qubits and
gates in a circuit.

The DependencyLists maps qubits to linked lists of gates that depend on that
qubit in execution order.
Additionally, the DependencyLists can compute the MCPE heuristic cost
function for candidate qubit swaps.
"""
def __init__(self, circuit: cirq.Circuit):
self.dependencies = defaultdict(deque)
for moment in circuit:
for operation in moment:
for qubit in operation.qubits:
self.dependencies[qubit].append(operation)

def peek_front(self, qubit: cirq.Qid) -> Iterable[cirq.Operation]:
"""Returns the first gate in a qubit's dependency list."""
return self.dependencies[qubit][0]

def pop_front(self, qubit: cirq.Qid) -> None:
"""Removes the first gate in a qubit's dependency list."""
self.dependencies[qubit].popleft()

def empty(self, qubit: cirq.Qid) -> bool:
"""Returns true iff the qubit's dependency list is empty."""
return qubit not in self.dependencies or not self.dependencies[qubit]

def all_empty(self) -> bool:
"""Returns true iff all dependency lists are empty."""
return all(len(dlist) == 0 for dlist in self.dependencies.values())

def active_gates(self) -> Set[cirq.Operation]:
"""Returns the currently active gates of the circuit represented by the
dependency lists.

The active gates are the ones which operate on qubits that have no other
preceding gate operations.
"""
ret = set()
# Recomputing the active gates from scratch is less efficient than
# maintaining them as the dependency lists are updated (e.g. maintaining
# additional state for front gates, active gates, and frozen qubits).
# This can be optimized later if necessary.
for dlist in self.dependencies.values():
if not dlist:
continue
gate = dlist[0]
if gate in ret:
continue
if all(not self.empty(q) and self.peek_front(q) == gate
for q in gate.qubits):
ret.add(gate)
return ret

def _maximum_consecutive_positive_effect_impl(
self, swap_q1: cirq.GridQubit, swap_q2: cirq.GridQubit,
gates: Iterable[cirq.Operation], mapping: QubitMapping) -> int:
"""Computes the MCPE contribution from a single qubit's dependency list.

This is where the dynamic look-ahead window is applied -- the window of
gates that contribute to the MCPE ends after the first gate encountered
which would be made worse by applying the swap (the first one with
effect_of_swap() < 0).

Args:
swap_q1: the source qubit to swap
swap_q2: the target qubit to swap
gates: the dependency list of gate operations on logical qubits
mapping: the mapping between logical and physical qubits for gates
"""
total_cost = 0
for gate in gates:
if len(gate.qubits) > 2:
raise ValueError(
"Cannot compute maximum consecutive positive effect on gates with >2 qubits."
)
if len(gate.qubits) != 2:
# Single-qubit gates would not be affected by the swap. We can
# treat the change in cost as 0 for those.
continue
swap_cost = effect_of_swap((swap_q1, swap_q2),
tuple(map(mapping.physical,
gate.qubits)))
if swap_cost < 0:
break
total_cost += swap_cost
return total_cost

def maximum_consecutive_positive_effect(self, swap_q1: cirq.GridQubit,
swap_q2: cirq.GridQubit,
mapping: QubitMapping) -> int:
"""Computes the MCPE heuristic cost function of applying the swap to the
circuit represented by this set of DependencyLists.

Args:
swap_q1: the source qubit to swap
swap_q2: the target qubit to swap
mapping: the mapping between logical and physical qubits for gate in the dependency lists
"""
return sum(
self._maximum_consecutive_positive_effect_impl(
swap_q1, swap_q2, self.dependencies[mapping.logical(q)],
mapping) for q in (swap_q1, swap_q2))
177 changes: 177 additions & 0 deletions recirq/quantum_chess/mcpe_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from collections import deque

import pytest
import cirq

import recirq.quantum_chess.mcpe_utils as mcpe


def test_manhattan_distance():
assert mcpe.manhattan_dist(cirq.GridQubit(0, 0), cirq.GridQubit(0, 0)) == 0
assert mcpe.manhattan_dist(cirq.GridQubit(1, 2), cirq.GridQubit(1, 2)) == 0
assert mcpe.manhattan_dist(cirq.GridQubit(1, 2), cirq.GridQubit(3, 4)) == 4
assert mcpe.manhattan_dist(cirq.GridQubit(3, 4), cirq.GridQubit(1, 2)) == 4
assert mcpe.manhattan_dist(cirq.GridQubit(-1, 2), cirq.GridQubit(3,
-4)) == 10


def test_swap_map_fn():
x, y, z = (cirq.NamedQubit(f'q{i}') for i in range(3))
swap = mcpe.swap_map_fn(x, y)
assert swap(x) == y
assert swap(y) == x
assert swap(z) == z
assert cirq.Circuit(cirq.ISWAP(x, z), cirq.ISWAP(y, z), cirq.ISWAP(
x, y)).transform_qubits(swap) == cirq.Circuit(cirq.ISWAP(y, z),
cirq.ISWAP(x, z),
cirq.ISWAP(y, x))


def test_effect_of_swap():
a1, a2, a3, b1, b2, b3 = cirq.GridQubit.rect(2, 3)
# If there's a gate operating on (a1, a3), then swapping a1 and a2 will
# bring the gate's qubits closer together by 1.
assert mcpe.effect_of_swap((a1, a2), (a1, a3)) == 1
# In reverse, a gate operating on (a2, a3) will get worse by 1 when swapping
# (a1, a2).
assert mcpe.effect_of_swap((a1, a2), (a2, a3)) == -1
# If the qubits to be swapped are completely independent of the gate's
# qubits, then there's no effect on the gate.
assert mcpe.effect_of_swap((a1, a2), (b1, b2)) == 0
# We can also measure the effect of swapping non-adjacent qubits (although
# we would never be able to do this with a real SWAP gate).
assert mcpe.effect_of_swap((a1, a3), (a1, b3)) == 2


def test_peek():
x, y, z = (cirq.NamedQubit(f'q{i}') for i in range(3))
g = [cirq.ISWAP(x, y), cirq.ISWAP(x, z), cirq.ISWAP(y, z)]
dlists = mcpe.DependencyLists(cirq.Circuit(g))
assert dlists.peek_front(x) == g[0]
assert dlists.peek_front(y) == g[0]
assert dlists.peek_front(z) == g[1]


def test_pop():
x, y, z = (cirq.NamedQubit(f'q{i}') for i in range(3))
g = [cirq.ISWAP(x, y), cirq.ISWAP(x, z), cirq.ISWAP(y, z)]
dlists = mcpe.DependencyLists(cirq.Circuit(g))

assert dlists.peek_front(x) == g[0]
dlists.pop_front(x)
assert dlists.peek_front(x) == g[1]


def test_empty():
x, y, z = (cirq.NamedQubit(f'q{i}') for i in range(3))
dlists = mcpe.DependencyLists(
cirq.Circuit(cirq.ISWAP(x, y), cirq.ISWAP(x, z), cirq.ISWAP(y, z)))

assert not dlists.empty(x)
dlists.pop_front(x)
assert not dlists.empty(x)
dlists.pop_front(x)
assert dlists.empty(x)

assert not dlists.all_empty()
dlists.pop_front(y)
dlists.pop_front(y)
dlists.pop_front(z)
dlists.pop_front(z)
assert dlists.all_empty()


def test_active_gates():
w, x, y, z = (cirq.NamedQubit(f'q{i}') for i in range(4))
dlists = mcpe.DependencyLists(
cirq.Circuit(cirq.ISWAP(x, y), cirq.ISWAP(y, z), cirq.X(w)))

assert dlists.active_gates() == {cirq.ISWAP(x, y), cirq.X(w)}


def test_physical_mapping():
q = list(cirq.NamedQubit(f'q{i}') for i in range(6))
Q = list(cirq.GridQubit(row, col) for row in range(2) for col in range(3))
mapping = mcpe.QubitMapping(dict(zip(q, Q)))
assert list(map(mapping.physical, q)) == Q
assert cirq.ISWAP(q[1],
q[5]).transform_qubits(mapping.physical) == cirq.ISWAP(
Q[1], Q[5])


def test_swap():
q = list(cirq.NamedQubit(f'q{i}') for i in range(6))
Q = list(cirq.GridQubit(row, col) for row in range(2) for col in range(3))
mapping = mcpe.QubitMapping(dict(zip(q, Q)))

mapping.swap_physical(Q[0], Q[1])
g = cirq.CNOT(q[0], q[2])
assert g.transform_qubits(mapping.physical) == cirq.CNOT(Q[1], Q[2])

mapping.swap_physical(Q[2], Q[3])
mapping.swap_physical(Q[1], Q[4])
assert g.transform_qubits(mapping.physical) == cirq.CNOT(Q[4], Q[3])


def test_mcpe_example_8():
# This test is example 8 from the circuit in figure 9 of
# https://ieeexplore.ieee.org/abstract/document/8976109.
q = list(cirq.NamedQubit(f'q{i}') for i in range(6))
Q = list(cirq.GridQubit(row, col) for row in range(2) for col in range(3))
mapping = mcpe.QubitMapping(dict(zip(q, Q)))
dlists = mcpe.DependencyLists(
cirq.Circuit(cirq.CNOT(q[0], q[2]), cirq.CNOT(q[5], q[2]),
cirq.CNOT(q[0], q[5]), cirq.CNOT(q[4], q[0]),
cirq.CNOT(q[0], q[3]), cirq.CNOT(q[5], q[0]),
cirq.CNOT(q[3], q[1])))

assert dlists.maximum_consecutive_positive_effect(Q[0], Q[1], mapping) == 4


def test_mcpe_example_9():
# This test is example 9 from the circuit in figures 9 and 10 of
# https://ieeexplore.ieee.org/abstract/document/8976109.
q = list(cirq.NamedQubit(f'q{i}') for i in range(6))
Q = list(cirq.GridQubit(row, col) for row in range(2) for col in range(3))
mapping = mcpe.QubitMapping(dict(zip(q, Q)))
dlists = mcpe.DependencyLists(
cirq.Circuit(cirq.CNOT(q[0], q[2]), cirq.CNOT(q[5], q[2]),
cirq.CNOT(q[0], q[5]), cirq.CNOT(q[4], q[0]),
cirq.CNOT(q[0], q[3]), cirq.CNOT(q[5], q[0]),
cirq.CNOT(q[3], q[1])))

# At first CNOT(q0, q2) is the active gate.
assert dlists.active_gates() == {cirq.CNOT(q[0], q[2])}
# The swaps connected to either q0 or q2 to consider are:
# (Q0, Q1), (Q0, Q3), (Q1, Q2), (Q2, Q5)
# Of these, (Q0, Q3) and (Q2, Q5) can be discarded because they would
# negatively impact the active CNOT(q0, q2) gate.
assert mcpe.effect_of_swap((Q[0], Q[3]), (Q[0], Q[2])) < 0
assert mcpe.effect_of_swap((Q[2], Q[5]), (Q[0], Q[2])) < 0
# The remaining candidate swaps are: (Q0, Q1) and (Q1, Q2)
# (Q0, Q1) has a higher MCPE, so it looks better to apply that one.
assert dlists.maximum_consecutive_positive_effect(Q[0], Q[1], mapping) == 4
assert dlists.maximum_consecutive_positive_effect(Q[1], Q[2], mapping) == 1
mapping.swap_physical(Q[0], Q[1])

# The swap-update algorithm would now advance beyond the front-most gates that
# now satisfy adjacency constraints after the swap -- the CNOT(q0, q2) and
# CNOT(q5, q2)
assert dlists.active_gates() == {cirq.CNOT(q[0], q[2])}
dlists.pop_front(q[0])
dlists.pop_front(q[2])
assert dlists.active_gates() == {cirq.CNOT(q[5], q[2])}
dlists.pop_front(q[5])
dlists.pop_front(q[2])

# Now the active gate is g2 (which is CNOT(q0, q5))
assert dlists.active_gates() == {cirq.CNOT(q[0], q[5])}
# For this active gate, the swaps to consider are:
# (Q0, Q1), (Q1, Q2), (Q1, Q4), (Q2, Q5), (Q4, Q5)
# (Q0, Q1) can be discarded because it negatively impacts the active gate.
assert mcpe.effect_of_swap((Q[0], Q[1]), (Q[1], Q[5])) < 0
# Of the remaining candidate swaps, (Q0, Q4) has the highest MCPE.
assert dlists.maximum_consecutive_positive_effect(Q[1], Q[2], mapping) == 1
assert dlists.maximum_consecutive_positive_effect(Q[1], Q[4], mapping) == 3
assert dlists.maximum_consecutive_positive_effect(Q[2], Q[5], mapping) == 2
assert dlists.maximum_consecutive_positive_effect(Q[4], Q[5], mapping) == 2
Loading