From 6d785c726132f67523fd990685a3ba41ab96ad4a Mon Sep 17 00:00:00 2001 From: Mario Dagrada Date: Mon, 2 Oct 2023 13:46:29 +0200 Subject: [PATCH] Added main utility modules * state preparation * logging * register * execution helpers * qubit support * serialization * typing * wavefunction overlaps Co-authored-by: Aleksander Wennersteen Co-authored-by: Mario Dagrada Co-authored-by: Vincent Elfving Co-authored-by: Dominik Seitz Co-authored-by: Joao Moutinho Co-authored-by: Vytautas Abramavicius Co-authored-by: Niklas Heim Co-authored-by: Roland Guichard --- qadence/__init__.py | 65 +++++ qadence/circuit.py | 204 ++++++++++++++ qadence/divergences.py | 37 +++ qadence/execution.py | 264 +++++++++++++++++++ qadence/extensions.py | 99 +++++++ qadence/logger.py | 35 +++ qadence/overlap.py | 453 +++++++++++++++++++++++++++++++ qadence/py.typed | 0 qadence/qubit_support.py | 61 +++++ qadence/register.py | 223 ++++++++++++++++ qadence/serialization.py | 352 +++++++++++++++++++++++++ qadence/states.py | 557 +++++++++++++++++++++++++++++++++++++++ qadence/types.py | 345 ++++++++++++++++++++++++ qadence/utils.py | 213 +++++++++++++++ 14 files changed, 2908 insertions(+) create mode 100644 qadence/__init__.py create mode 100644 qadence/circuit.py create mode 100644 qadence/divergences.py create mode 100644 qadence/execution.py create mode 100644 qadence/extensions.py create mode 100644 qadence/logger.py create mode 100644 qadence/overlap.py create mode 100644 qadence/py.typed create mode 100644 qadence/qubit_support.py create mode 100644 qadence/register.py create mode 100644 qadence/serialization.py create mode 100644 qadence/states.py create mode 100644 qadence/types.py create mode 100644 qadence/utils.py diff --git a/qadence/__init__.py b/qadence/__init__.py new file mode 100644 index 00000000..c4f673fe --- /dev/null +++ b/qadence/__init__.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from importlib import import_module + +from torch import cdouble, set_default_dtype +from torch import float64 as torchfloat64 + +from .backend import * +from .backends import * +from .blocks import * +from .circuit import * +from .constructors import * +from .errors import * +from .execution import * +from .measurements import * +from .ml_tools import * +from .models import * +from .operations import * +from .overlap import * +from .parameters import * +from .register import * +from .serialization import * +from .states import * +from .transpile import * +from .types import * +from .utils import * + +DEFAULT_FLOAT_DTYPE = torchfloat64 +DEFAULT_COMPLEX_DTYPE = cdouble +set_default_dtype(DEFAULT_FLOAT_DTYPE) +""" +The imports above fetch the functions defined in the __all__ of each sub-module +to the qadence name space. Make sure each added submodule has the respective definition: + + - `__all__ = ["function0", "function1", ...]` + +Furthermore, add the submodule to the list below to automatically build +the __all__ of the qadence namespace. Make sure to keep alphabetical ordering. +""" + +list_of_submodules = [ + ".backends", + ".blocks", + ".circuit", + ".constructors", + ".errors", + ".execution", + ".measurements", + ".ml_tools", + ".models", + ".operations", + ".overlap", + ".parameters", + ".register", + ".serialization", + ".states", + ".transpile", + ".types", + ".utils", +] + +__all__ = [] +for submodule in list_of_submodules: + __all_submodule__ = getattr(import_module(submodule, package="qadence"), "__all__") + __all__ += __all_submodule__ diff --git a/qadence/circuit.py b/qadence/circuit.py new file mode 100644 index 00000000..1ad4b487 --- /dev/null +++ b/qadence/circuit.py @@ -0,0 +1,204 @@ +from __future__ import annotations + +from dataclasses import dataclass +from itertools import chain as flatten +from pathlib import Path +from typing import Iterable + +from sympy import Array, Basic + +from qadence.blocks import AbstractBlock, AnalogBlock, CompositeBlock, chain +from qadence.blocks.utils import parameters, primitive_blocks +from qadence.parameters import Parameter +from qadence.register import Register + +# Modules to be automatically added to the qadence namespace +__all__ = ["QuantumCircuit"] + + +@dataclass(eq=False) # Avoid unhashability errors due to mutable attributes. +class QuantumCircuit: + """A QuantumCircuit instance is completely abstract and it needs to be passed to a quantum + backend in order to be executed. + """ + + block: AbstractBlock + register: Register + + def __init__(self, support: int | Register, *blocks: AbstractBlock): + """ + Arguments: + support: `Register` or number of qubits. If an integer is provided, a register is + constructed with `Register.all_to_all(x)` + *blocks: (Possibly multiple) blocks to construct the circuit from. + """ + self.block = chain(*blocks) if len(blocks) != 1 else blocks[0] + self.register = Register(support) if isinstance(support, int) else support + + global_block = isinstance(self.block, AnalogBlock) and self.block.qubit_support.is_global + if not global_block and len(self.block) and self.block.n_qubits > self.register.n_qubits: + raise ValueError( + f"Register with {self.register.n_qubits} qubits is too small for the " + f"given block with {self.block.n_qubits} qubits" + ) + + @property + def n_qubits(self) -> int: + return self.register.n_qubits + + def __eq__(self, other: object) -> bool: + if not isinstance(other, QuantumCircuit): + raise TypeError(f"Cannot compare {type(self)} to {type(other)}.") + if self.block != other.block: # type: ignore[call-overload] + return False + if self.register != other.register: + return False + return True + + def __hash__(self) -> int: + return hash(self._to_json()) + + def __iter__(self) -> Iterable: + if isinstance(self.block, CompositeBlock): + yield from self.block + else: + yield self.block + + def __contains__(self, other: object) -> bool: + if isinstance(other, AbstractBlock): + if isinstance(self.block, CompositeBlock): + return other in self.block + else: + return other == self.block + elif isinstance(other, Parameter): + return other in self.unique_parameters + else: + raise TypeError(f"Cant compare {type(self)} to {type(other)}") + + @property + def unique_parameters(self) -> list[Parameter]: + """Return the unique parameters in the circuit + + These parameters are the actual user-facing parameters which + can be assigned by the user. Multiple gates can contain the + same unique parameter + + Returns: + list[Parameter]: List of unique parameters in the circuit + """ + symbols = [] + for p in parameters(self.block): + if isinstance(p, Array): + continue + elif not p.is_number and p not in symbols: + symbols.append(p) + return symbols + + @property + def num_unique_parameters(self) -> int: + return len(self.unique_parameters) if self.unique_parameters else 0 + + @property + def num_parameters(self) -> int: + return len(self.parameters()) + + def parameters(self) -> list[Parameter | Basic] | list[tuple[Parameter | Basic, ...]]: + """Extract all parameters for primitive blocks in the circuit + + Notice that this function returns all the unique Parameters used + in the quantum circuit. These can correspond to constants too. + + Returns: + List[tuple[Parameter]]: A list of tuples containing the Parameter + instance of each of the primitive blocks in the circuit or, if the `flatten` + flag is set to True, a flattened list of all circuit parameters + """ + return parameters(self.block) + + def get_blocks_by_tag(self, tag: str) -> list[AbstractBlock]: + """Extract one or more blocks using the human-readable tag + + This function recurservily explores all composite blocks to find + all the occurrences of a certain tag in the blocks + + Args: + tag (str): the tag to look for + + Returns: + list[AbstractBlock]: The block(s) corresponding to the given tag + """ + + def _get_block(block: AbstractBlock) -> list[AbstractBlock]: + blocks = [] + if block.tag == tag: + blocks += [block] + if isinstance(block, CompositeBlock): + blocks += flatten(*[_get_block(b) for b in block.blocks]) + return blocks + + return _get_block(self.block) + + def is_empty(self) -> bool: + return len(primitive_blocks(self.block)) == 0 + + def serialize(self) -> str: + raise NotImplementedError + + @staticmethod + def deserialize(json: str) -> QuantumCircuit: + raise NotImplementedError + + def __repr__(self) -> str: + return self.block.__repr__() + + def _to_dict(self) -> dict: + return { + "block": self.block._to_dict(), + "register": self.register._to_dict(), + } + + def _to_json(self, path: Path | str | None = None) -> str: + import json + + qc_dumped = json.dumps(self._to_dict()) + if path is not None: + path = Path(path) + try: + with open(path, "w") as file: + file.write(qc_dumped) + except Exception as e: + print(f"Unable to write QuantumCircuit to disk due to {e}") + + return qc_dumped + + @classmethod + def _from_dict(cls, d: dict) -> QuantumCircuit: + from qadence import blocks as qadenceblocks + from qadence import operations + + RootBlock = ( + getattr(operations, d["block"]["type"]) + if hasattr(operations, d["block"]["type"]) + else getattr(qadenceblocks, d["block"]["type"]) + ) + + return QuantumCircuit( + Register._from_dict(d["register"]), + RootBlock._from_dict(d["block"]), + ) + + @classmethod + def _from_json(cls, path: str | Path) -> QuantumCircuit: + import json + + loaded_dict: dict = {} + if isinstance(path, str): + path = Path(path) + try: + with open(path, "r") as file: + loaded_dict = json.load(file) + + except Exception as e: + print(f"Unable to load QuantumCircuit due to {e}") + + return QuantumCircuit._from_dict(loaded_dict) diff --git a/qadence/divergences.py b/qadence/divergences.py new file mode 100644 index 00000000..95bc24d7 --- /dev/null +++ b/qadence/divergences.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from collections import Counter + +import numpy as np + + +def shannon_entropy(counter: Counter) -> float: + return float(-np.sum([count * np.log(count) for count in counter.values()])) + + +def js_divergence(counter_p: Counter, counter_q: Counter) -> float: + """ + Compute the Jensen-Shannon divergence between two probability distributions + represented as Counter objects. + The JSD is calculated using only the shared keys between the two input Counter objects. + + Args: + counter_p (Counter): Counter of bitstring counts for probability mass function P. + counter_q (Counter): Counter of bitstring counts for probability mass function Q. + + Returns: + float: The Jensen-Shannon divergence between counter_p and counter_q. + """ + # Normalise counters + normalisation_p = np.sum([count for count in counter_p.values()]) + normalisation_q = np.sum([count for count in counter_q.values()]) + counter_p = Counter({k: v / normalisation_p for k, v in counter_p.items()}) + counter_q = Counter({k: v / normalisation_q for k, v in counter_q.items()}) + + average_proba_counter = counter_p + counter_q + average_proba_counter = Counter({k: v / 2.0 for k, v in average_proba_counter.items()}) + average_entropy = shannon_entropy(average_proba_counter) + + entropy_p = shannon_entropy(counter_p) + entropy_q = shannon_entropy(counter_q) + return float(average_entropy - (entropy_p + entropy_q) / 2.0) diff --git a/qadence/execution.py b/qadence/execution.py new file mode 100644 index 00000000..ecbeb2ff --- /dev/null +++ b/qadence/execution.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +from collections import Counter +from functools import singledispatch +from typing import Any, Union + +from torch import Tensor, no_grad + +from qadence import backend_factory +from qadence.backend import BackendConfiguration, BackendName +from qadence.blocks import AbstractBlock +from qadence.circuit import QuantumCircuit +from qadence.register import Register +from qadence.types import DiffMode +from qadence.utils import Endianness + +# Modules to be automatically added to the qadence namespace +__all__ = ["run", "sample", "expectation"] + + +@singledispatch +def run( + x: Union[QuantumCircuit, AbstractBlock, Register, int], + *args: Any, + values: dict = {}, + state: Tensor = None, + backend: BackendName = BackendName.PYQTORCH, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> Tensor: + """Convenience wrapper for the `QuantumModel.run` method. This is a + `functools.singledispatch`ed function so it can be called with a number of different arguments. + See the examples of the [`expectation`][qadence.execution.expectation] function. This function + works exactly the same. + + Arguments: + x: Circuit, block, or (register+block) to run. + values: User-facing parameter dict. + state: Initial state. + backend: Name of the backend to run on. + endianness: The target device endianness. + configuration: The backend configuration. + + Returns: + A wavefunction + """ + raise ValueError(f"Cannot run {type(x)}") + + +@run.register +def _( + circuit: QuantumCircuit, + values: dict = {}, + state: Tensor = None, + backend: BackendName = BackendName.PYQTORCH, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> Tensor: + bknd = backend_factory(backend, configuration=configuration) + conv = bknd.convert(circuit) + with no_grad(): + return bknd.run( + circuit=conv.circuit, + param_values=conv.embedding_fn(conv.params, values), + state=state, + endianness=endianness, + ) + + +@run.register +def _(register: Register, block: AbstractBlock, **kwargs: Any) -> Tensor: + return run(QuantumCircuit(register, block), **kwargs) + + +@run.register +def _(n_qubits: int, block: AbstractBlock, **kwargs: Any) -> Tensor: + return run(Register(n_qubits), block, **kwargs) + + +@run.register +def _(block: AbstractBlock, **kwargs: Any) -> Tensor: + return run(Register(block.n_qubits), block, **kwargs) + + +@singledispatch +def sample( + x: Union[QuantumCircuit, AbstractBlock, Register, int], + *args: Any, + values: dict = {}, + state: Union[Tensor, None] = None, + n_shots: int = 100, + backend: BackendName = BackendName.PYQTORCH, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> list[Counter]: + """Convenience wrapper for the `QuantumModel.sample` method. This is a + `functools.singledispatch`ed function so it can be called with a number of different arguments. + See the examples of the [`expectation`][qadence.execution.expectation] function. This function + works exactly the same. + + Arguments: + x: Circuit, block, or (register+block) to run. + values: User-facing parameter dict. + state: Initial state. + n_shots: Number of shots per element in the batch. + backend: Name of the backend to run on. + endianness: The target device endianness. + configuration: The backend configuration. + + Returns: + A list of Counter instances with the sample results + """ + raise ValueError(f"Cannot sample from {type(x)}") + + +@sample.register +def _( + circuit: QuantumCircuit, + values: dict = {}, + state: Union[Tensor, None] = None, + n_shots: int = 100, + backend: BackendName = BackendName.PYQTORCH, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> list[Counter]: + bknd = backend_factory(backend, configuration=configuration) + conv = bknd.convert(circuit) + return bknd.sample( + circuit=conv.circuit, + param_values=conv.embedding_fn(conv.params, values), + n_shots=n_shots, + state=state, + endianness=endianness, + ) + + +@sample.register +def _(register: Register, block: AbstractBlock, **kwargs: Any) -> Tensor: + return sample(QuantumCircuit(register, block), **kwargs) + + +@sample.register +def _(n_qubits: int, block: AbstractBlock, **kwargs: Any) -> Tensor: + return sample(Register(n_qubits), block, **kwargs) + + +@sample.register +def _(block: AbstractBlock, **kwargs: Any) -> Tensor: + reg = Register(block.n_qubits) + return sample(reg, block, **kwargs) + + +@singledispatch +def expectation( + x: Union[QuantumCircuit, AbstractBlock, Register, int], + observable: Union[list[AbstractBlock], AbstractBlock], + values: dict = {}, + state: Tensor = None, + backend: BackendName = BackendName.PYQTORCH, + diff_mode: Union[DiffMode, str, None] = None, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> Tensor: + """Convenience wrapper for the `QuantumModel.expectation` method. This is a + `functools.singledispatch`ed function so it can be called with a number of different arguments + (see in the examples). + + Arguments: + x: Circuit, block, or (register+block) to run. + observable: Observable(s) w.r.t. which the expectation is computed. + values: User-facing parameter dict. + state: Initial state. + backend: Name of the backend to run on. + diff_mode: Which differentiation mode to use. + endianness: The target device endianness. + configuration: The backend configuration. + + Returns: + A wavefunction + + + ```python exec="on" source="material-block" + from qadence import RX, Z, Register, QuantumCircuit, expectation + + reg = Register(1) + block = RX(0, 0.5) + observable = Z(0) + circ = QuantumCircuit(reg, block) + + # You can compute the expectation for a + # QuantumCircuit with a given observable. + expectation(circ, observable) + + # You can also use only a block. + # In this case the register is constructed automatically to + # Register.line(block.n_qubits) + expectation(block, observable) + + # Or a register and block + expectation(reg, block, observable) + ```""" + + raise ValueError(f"Cannot execute {type(x)}") + + +@expectation.register +def _( + circuit: QuantumCircuit, + observable: Union[list[AbstractBlock], AbstractBlock], + values: dict = {}, + state: Tensor = None, + backend: BackendName = BackendName.PYQTORCH, + diff_mode: Union[DiffMode, str, None] = None, + endianness: Endianness = Endianness.BIG, + configuration: Union[BackendConfiguration, dict, None] = None, +) -> Tensor: + observable = observable if isinstance(observable, list) else [observable] + bknd = backend_factory(backend, configuration=configuration) + conv = bknd.convert(circuit, observable) + + def _expectation() -> Tensor: + return bknd.expectation( + circuit=conv.circuit, + observable=conv.observable, # type: ignore[arg-type] + param_values=conv.embedding_fn(conv.params, values), + state=state, + endianness=endianness, + ) + + # Do not compute gradients if no diff_mode is provided. + if diff_mode is None: + with no_grad(): + return _expectation() + else: + return _expectation() + + +@expectation.register +def _( + register: Register, + block: AbstractBlock, + observable: Union[list[AbstractBlock], AbstractBlock], + **kwargs: Any, +) -> Tensor: + return expectation(QuantumCircuit(register, block), observable, **kwargs) + + +@expectation.register +def _( + n_qubits: int, + block: AbstractBlock, + observable: Union[list[AbstractBlock], AbstractBlock], + **kwargs: Any, +) -> Tensor: + reg = Register(n_qubits) + return expectation(QuantumCircuit(reg, block), observable, **kwargs) + + +@expectation.register +def _( + block: AbstractBlock, observable: Union[list[AbstractBlock], AbstractBlock], **kwargs: Any +) -> Tensor: + reg = Register(block.n_qubits) + return expectation(QuantumCircuit(reg, block), observable, **kwargs) diff --git a/qadence/extensions.py b/qadence/extensions.py new file mode 100644 index 00000000..dd564c44 --- /dev/null +++ b/qadence/extensions.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import importlib +from string import Template +from typing import TypeVar + +from qadence.backend import Backend +from qadence.blocks import ( + AbstractBlock, +) +from qadence.types import BackendName, DiffMode + +TAbstractBlock = TypeVar("TAbstractBlock", bound=AbstractBlock) + +backends_namespace = Template("qadence.backends.$name") + + +def _available_backends() -> dict: + res = {} + for backend in BackendName.list(): + module_path = f"qadence.backends.{backend}.backend" + try: + module = importlib.import_module(module_path) + BackendCls = getattr(module, "Backend") + res[backend] = BackendCls + except (ImportError, ModuleNotFoundError): + pass + + return res + + +def _supported_gates(name: BackendName | str) -> list[TAbstractBlock]: + from qadence import operations + + name = str(BackendName(name).name.lower()) + + try: + backend_namespace = backends_namespace.substitute(name=name) + module = importlib.import_module(backend_namespace) + except KeyError: + pass + _supported_gates = getattr(module, "supported_gates", None) + assert ( + _supported_gates is not None + ), f"{name} backend should define a 'supported_gates' variable" + return [getattr(operations, gate) for gate in _supported_gates] + + +def _gpsr_fns() -> dict: + # avoid circular import + from qadence.backends.gpsr import general_psr + + return {DiffMode.GPSR: general_psr} + + +def _validate_diff_mode(backend: Backend, diff_mode: DiffMode) -> None: + if not backend.supports_ad and diff_mode == DiffMode.AD: + raise TypeError(f"Backend {backend.name} does not support diff_mode {DiffMode.AD}.") + + +def _set_backend_config(backend: Backend, diff_mode: DiffMode) -> None: + """_summary_ + + Args: + backend (Backend): _description_ + diff_mode (DiffMode): _description_ + """ + + _validate_diff_mode(backend, diff_mode) + + if not backend.supports_ad or diff_mode != DiffMode.AD: + backend.config._use_gate_params = True + + # (1) When using PSR with any backend or (2) we use the backends Pulser or Braket, + # we have to use gate-level parameters + + else: + assert diff_mode == DiffMode.AD + backend.config._use_gate_params = False + # We can use expression-level parameters for AD. + if backend.name == BackendName.PYQTORCH: + backend.config.use_single_qubit_composition = True + + # For pyqtorch, we enable some specific transpilation passes. + + +# if proprietary qadence_plus is available import the +# right function since more backends are supported +try: + module = importlib.import_module("qadence_extensions.extensions") + available_backends = getattr(module, "available_backends") + supported_gates = getattr(module, "supported_gates") + get_gpsr_fns = getattr(module, "gpsr_fns") + set_backend_config = getattr(module, "set_backend_config") +except ModuleNotFoundError: + available_backends = _available_backends + supported_gates = _supported_gates + get_gpsr_fns = _gpsr_fns + set_backend_config = _set_backend_config diff --git a/qadence/logger.py b/qadence/logger.py new file mode 100644 index 00000000..3963e2d9 --- /dev/null +++ b/qadence/logger.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import logging +import os +import sys + +logging_levels = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + "CRITICAL": logging.CRITICAL, +} + +LOG_STREAM_HANDLER = sys.stdout + +DEFAULT_LOGGING_LEVEL = logging.INFO + +# FIXME: introduce a better handling of the configuration +LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL", "warning").upper() + + +def get_logger(name: str) -> logging.Logger: + logger: logging.Logger = logging.getLogger(name) + + level = logging_levels.get(LOGGING_LEVEL, DEFAULT_LOGGING_LEVEL) + logger.setLevel(level) + + formatter = logging.Formatter("%(levelname) -5s %(asctime)s: %(message)s", "%Y-%m-%d %H:%M:%S") + # formatter = logging.Formatter(LOG_FORMAT) + sh = logging.StreamHandler(LOG_STREAM_HANDLER) + sh.setFormatter(formatter) + logger.addHandler(sh) + + return logger diff --git a/qadence/overlap.py b/qadence/overlap.py new file mode 100644 index 00000000..c5fa9615 --- /dev/null +++ b/qadence/overlap.py @@ -0,0 +1,453 @@ +from __future__ import annotations + +from collections import Counter +from typing import Any, Callable + +import numpy as np +import torch +from torch import Tensor + +from qadence.backend import BackendConfiguration, BackendName +from qadence.backends.pytorch_wrapper import DiffMode +from qadence.blocks import AbstractBlock, chain, kron, tag +from qadence.circuit import QuantumCircuit +from qadence.divergences import js_divergence +from qadence.measurements import Measurements +from qadence.models import QuantumModel +from qadence.operations import SWAP, H, I, S, Z +from qadence.transpile import reassign +from qadence.types import OverlapMethod + +# Modules to be automatically added to the qadence namespace +__all__ = ["Overlap", "OverlapMethod"] + + +def _cswap(control: int, target1: int, target2: int) -> AbstractBlock: + # define projectors on control qubit + p0 = 0.5 * I(control) + 0.5 * Z(control) + p1 = 0.5 * I(control) + (-0.5) * Z(control) + + # construct controlled-SWAP block + cswap_blocks = kron(p0, I(target1), I(target2)) + kron(p1, SWAP(target1, target2)) + cswap = tag(cswap_blocks, f"CSWAP({control}, {target1}, {target2})") + + return cswap + + +def _controlled_unitary(control: int, unitary_block: AbstractBlock) -> AbstractBlock: + n_qubits = unitary_block.n_qubits + + # define projectors on control qubit + p0 = 0.5 * I(control) + 0.5 * Z(control) + p1 = 0.5 * I(control) + (-0.5) * Z(control) + + # shift qubit support of unitary + shifted_unitary_block = reassign(unitary_block, {i: control + i + 1 for i in range(n_qubits)}) + + # construct controlled-U block + cu_blocks = kron(p0, *[I(control + i + 1) for i in range(n_qubits)]) + kron( + p1, shifted_unitary_block + ) + cu = tag(cu_blocks, f"c-U({control}, {shifted_unitary_block.qubit_support})") + + return cu + + +def _is_counter_list(lst: list[Counter]) -> bool: + return all(map(lambda x: isinstance(x, Counter), lst)) and isinstance(lst, list) + + +def _select_overlap_method( + method: OverlapMethod, + backend: BackendName, + bra_circuit: QuantumCircuit, + ket_circuit: QuantumCircuit, +) -> tuple[Callable, QuantumCircuit, QuantumCircuit]: + if method == OverlapMethod.EXACT: + fn = overlap_exact + + def _overlap_fn( + param_values: dict, + bra_calc_fn: Callable, + bra_state: Tensor | None, + ket_calc_fn: Callable, + ket_state: Tensor | None, + ) -> Tensor: + bras = bra_calc_fn(param_values["bra"], bra_state) + kets = ket_calc_fn(param_values["ket"], ket_state) + overlap = fn(bras, kets) + return overlap + + elif method == OverlapMethod.JENSEN_SHANNON: + + def _overlap_fn( + param_values: dict, + bra_calc_fn: Callable, + bra_state: Tensor | None, + ket_calc_fn: Callable, + ket_state: Tensor | None, + ) -> Tensor: + bras = bra_calc_fn(param_values["bra"], bra_state) + kets = ket_calc_fn(param_values["ket"], ket_state) + overlap = overlap_jensen_shannon(bras, kets) + return overlap + + elif method == OverlapMethod.COMPUTE_UNCOMPUTE: + # create a single circuit from bra and ket circuits + bra_circuit = QuantumCircuit( + bra_circuit.n_qubits, bra_circuit.block, ket_circuit.block.dagger() + ) + ket_circuit = None # type: ignore[assignment] + + def _overlap_fn( # type: ignore [misc] + param_values: dict, bra_calc_fn: Callable, bra_state: Tensor | None, *_: Any + ) -> Tensor: + bras = bra_calc_fn(param_values["bra"], bra_state) + overlap = overlap_compute_uncompute(bras) + return overlap + + elif method == OverlapMethod.SWAP_TEST: + if backend == BackendName.BRAKET: + raise ValueError("SWAP test method is not supported by the Braket backend.") + + n_qubits = bra_circuit.n_qubits + + # shift qubit support of bra and ket circuit blocks + shifted_bra_block = reassign(bra_circuit.block, {i: i + 1 for i in range(n_qubits)}) + shifted_ket_block = reassign( + ket_circuit.block, {i: i + n_qubits + 1 for i in range(n_qubits)} + ) + ket_circuit = None # type: ignore[assignment] + + # construct swap test circuit + state_blocks = kron(shifted_bra_block, shifted_ket_block) + cswap_blocks = chain(*[_cswap(0, n + 1, n + 1 + n_qubits) for n in range(n_qubits)]) + swap_test_blocks = chain(H(0), state_blocks, cswap_blocks, H(0)) + bra_circuit = QuantumCircuit(2 * n_qubits + 1, swap_test_blocks) + + def _overlap_fn( # type: ignore [misc] + param_values: dict, bra_calc_fn: Callable, bra_state: Tensor | None, *_: Any + ) -> Tensor: + bras = bra_calc_fn(param_values["bra"], bra_state) + overlap = overlap_swap_test(bras) + return overlap + + elif method == OverlapMethod.HADAMARD_TEST: + if backend == BackendName.BRAKET: + raise ValueError("Hadamard test method is not supported by the Braket backend.") + + n_qubits = bra_circuit.n_qubits + + # construct controlled bra and ket blocks + c_bra_block = _controlled_unitary(0, bra_circuit.block) + c_ket_block = _controlled_unitary(0, ket_circuit.block.dagger()) + + # construct swap test circuit for Re part + re_blocks = chain(H(0), c_bra_block, c_ket_block, H(0)) + bra_circuit = QuantumCircuit(n_qubits + 1, re_blocks) + + # construct swap test circuit for Im part + im_blocks = chain(H(0), c_bra_block, c_ket_block, S(0), H(0)) + ket_circuit = QuantumCircuit(n_qubits + 1, im_blocks) + + def _overlap_fn( + param_values: dict, + bra_calc_fn: Callable, + bra_state: Tensor | None, + ket_calc_fn: Callable, + ket_state: Tensor | None, + ) -> Tensor: + bras = bra_calc_fn(param_values["bra"], bra_state) + kets = ket_calc_fn(param_values["ket"], ket_state) + overlap = overlap_hadamard_test(bras, kets) + return overlap + + return _overlap_fn, bra_circuit, ket_circuit + + +def overlap_exact(bras: Tensor, kets: Tensor) -> Tensor: + """Calculate overlap using exact quantum mechanical definition. + + Args: + bras (Tensor): full bra wavefunctions + kets (Tensor): full ket wavefunctions + + Returns: + Tensor: overlap tensor containing values of overlap of each bra with each ket + """ + return torch.abs(torch.sum(bras.conj() * kets, dim=1)) ** 2 + + +def fidelity(bras: Tensor, kets: Tensor) -> Tensor: + return overlap_exact(bras, kets) + + +def overlap_jensen_shannon(bras: list[Counter], kets: list[Counter]) -> Tensor: + """Calculate overlap from bitstring counts using Jensen-Shannon divergence method. + + Args: + bras (list[Counter]): bitstring counts corresponding to bra wavefunctions + kets (list[Counter]): bitstring counts corresponding to ket wavefunctions + + Returns: + Tensor: overlap tensor containing values of overlap of each bra with each ket + """ + return 1 - torch.tensor([js_divergence(p, q) for p, q in zip(bras, kets)]) + + +def overlap_compute_uncompute(bras: Tensor | list[Counter]) -> Tensor: + """Calculate overlap using compute-uncompute method from full wavefunctions or + bitstring counts. + + Args: + bras (Tensor | list[Counter]): full bra wavefunctions or bitstring counts + + Returns: + Tensor: overlap tensor containing values of overlap of each bra with zeros ket + """ + if isinstance(bras, Tensor): + # calculate exact overlap of full bra wavefunctions with |0> state + overlap = torch.abs(bras[:, 0]) ** 2 + + elif isinstance(bras, list): + # estimate overlap as the fraction of shots when "0..00" bitstring was observed + n_qubits = len(list(bras[0].keys())[0]) + n_shots = sum(list(bras[0].values())) + overlap = torch.tensor([p["0" * n_qubits] / n_shots for p in bras]) + + return overlap + + +def overlap_swap_test(bras: Tensor | list[Counter]) -> Tensor: + """Calculate overlap using swap test method from full wavefunctions or + bitstring counts. + + Args: + bras (Tensor | list[Counter]): full bra wavefunctions or bitstring counts + + Returns: + Tensor: overlap tensor + """ + if isinstance(bras, Tensor): + n_qubits = int(np.log2(bras.shape[1])) + + # define measurement operator |0><0| x I + proj_op = torch.tensor([[1.0, 0.0], [0.0, 0.0]]) + ident_op = torch.diag(torch.tensor([1.0 for _ in range(2 ** (n_qubits - 1))])) + meas_op = torch.kron(proj_op, ident_op).type(torch.complex128) + + # estimate overlap from ancilla qubit measurement + prob0 = (bras.conj() * torch.matmul(meas_op, bras.t()).t()).sum(dim=1).real + + elif _is_counter_list(bras): + # estimate overlap as the fraction of shots when 0 was observed on ancilla qubit + n_qubits = len(list(bras[0].keys())[0]) + n_shots = sum(list(bras[0].values())) + prob0 = torch.tensor( + [ + sum(map(lambda k, v: v if k[0] == "0" else 0, p.keys(), p.values())) / n_shots + for p in bras + ] + ) + else: + raise TypeError("Incorrect type passed for bras argument.") + + # construct final overlap tensor + overlap = 2 * prob0 - 1 + + return overlap + + +def overlap_hadamard_test( + bras_re: Tensor | list[Counter], bras_im: Tensor | list[Counter] +) -> Tensor: + """Calculate overlap using Hadamard test method from full wavefunctions or + bitstring counts. + + Args: + bras_re (Tensor | list[Counter]): full bra wavefunctions or bitstring counts + for estimation of overlap's real part + bras_im (Tensor | list[Counter]): full bra wavefunctions or bitstring counts + for estimation of overlap's imaginary part + + Returns: + Tensor: overlap tensor + """ + if isinstance(bras_re, Tensor) and isinstance(bras_im, Tensor): + n_qubits = int(np.log2(bras_re.shape[1])) + + # define measurement operator |0><0| x I + proj_op = torch.tensor([[1.0, 0.0], [0.0, 0.0]]) + ident_op = torch.diag(torch.tensor([1.0 for _ in range(2 ** (n_qubits - 1))])) + meas_op = torch.kron(proj_op, ident_op).type(torch.complex128) + + # estimate overlap from ancilla qubit measurement + prob0_re = (bras_re * torch.matmul(meas_op, bras_re.conj().t()).t()).sum(dim=1).real + prob0_im = (bras_im * torch.matmul(meas_op, bras_im.conj().t()).t()).sum(dim=1).real + + elif _is_counter_list(bras_re) and _is_counter_list(bras_im): + # estimate overlap as the fraction of shots when 0 was observed on ancilla qubit + n_qubits = len(list(bras_re[0].keys())[0]) + n_shots = sum(list(bras_re[0].values())) + prob0_re = torch.tensor( + [ + sum(map(lambda k, v: v if k[0] == "0" else 0, p.keys(), p.values())) / n_shots + for p in bras_re + ] + ) + prob0_im = torch.tensor( + [ + sum(map(lambda k, v: v if k[0] == "0" else 0, p.keys(), p.values())) / n_shots + for p in bras_im + ] + ) + else: + raise TypeError("Incorrect types passed for bras_re and kets_re arguments.") + + # construct final overlap tensor + overlap = (2 * prob0_re - 1) ** 2 + (2 * prob0_im - 1) ** 2 + + return overlap + + +class Overlap(QuantumModel): + def __init__( + self, + bra_circuit: QuantumCircuit, + ket_circuit: QuantumCircuit, + backend: BackendName = BackendName.PYQTORCH, + diff_mode: DiffMode = DiffMode.AD, + protocol: Measurements | None = None, + configuration: BackendConfiguration | dict | None = None, + method: OverlapMethod = OverlapMethod.EXACT, + ): + self.backend_name = backend + self.method = method + + overlap_fn, bra_circuit, ket_circuit = _select_overlap_method( + method, backend, bra_circuit, ket_circuit + ) + self.overlap_fn = overlap_fn + + super().__init__( + bra_circuit, + backend=backend, + diff_mode=diff_mode, + protocol=protocol, + configuration=configuration, + ) + self.bra_feat_param_names = set([inp.name for inp in self.inputs]) + + if ket_circuit: + self.ket_model = QuantumModel( + ket_circuit, + backend=backend, + diff_mode=diff_mode, + protocol=protocol, + configuration=configuration, + ) + self.ket_feat_param_names = set([inp.name for inp in self.ket_model.inputs]) + else: + self.ket_model = None # type: ignore [assignment] + self.ket_feat_param_names = set([]) + + def _process_param_values( + self, bra_param_values: dict[str, Tensor], ket_param_values: dict[str, Tensor] + ) -> dict: + # we assume that either batch sizes are equal or 0 in case when no user params + # are present in bra/ket + bra_param_values = { + k: v.reshape(-1) if v.shape == () else v for k, v in bra_param_values.items() + } + batch_size_bra = ( + len(list(bra_param_values.values())[0]) if len(bra_param_values) != 0 else 0 + ) + ket_param_values = { + k: v.reshape(-1) if v.shape == () else v for k, v in ket_param_values.items() + } + batch_size_ket = ( + len(list(ket_param_values.values())[0]) if len(ket_param_values) != 0 else 0 + ) + new_bra_param_values = bra_param_values.copy() + new_ket_param_values = ket_param_values.copy() + + # if len(self.bra_feat_param_names) + len(self.ket_feat_param_names) <= 2: + + if len(self.bra_feat_param_names.union(self.ket_feat_param_names)) == 2: + # extend bra parameter tensors + for param_name in new_bra_param_values.keys(): + new_bra_param_values[param_name] = new_bra_param_values[param_name].repeat( + batch_size_ket + ) + + # extend ket parameter tensors + for param_name in new_ket_param_values.keys(): + idxs = torch.cat( + [ + torch.ones(batch_size_bra, dtype=torch.int64) * i + for i in range(batch_size_ket) + ] + ) + new_ket_param_values[param_name] = new_ket_param_values[param_name][idxs] + + if self.method in [OverlapMethod.EXACT, OverlapMethod.JENSEN_SHANNON]: + param_values = {"bra": new_bra_param_values, "ket": new_ket_param_values} + elif self.method in [ + OverlapMethod.COMPUTE_UNCOMPUTE, + OverlapMethod.SWAP_TEST, + OverlapMethod.HADAMARD_TEST, + ]: + # merge bra and ket param values to simulate all wavefunctions in one batch + new_bra_param_values.update(new_ket_param_values) + param_values = {"bra": new_bra_param_values} + if self.method == OverlapMethod.HADAMARD_TEST: + param_values["ket"] = new_bra_param_values + + elif len(self.bra_feat_param_names.union(self.ket_feat_param_names)) < 2: + if batch_size_bra == batch_size_ket or batch_size_bra == 0 or batch_size_ket == 0: + param_values = {"bra": bra_param_values, "ket": ket_param_values} + else: + raise ValueError("Batch sizes of both bra and ket parameters must be equal.") + + else: + raise ValueError("Multiple feature parameters for bra/ket are not currently supported.") + + return param_values + + def forward( # type: ignore [override] + self, + bra_param_values: dict[str, Tensor] = {}, + ket_param_values: dict[str, Tensor] = {}, + bra_state: Tensor | None = None, + ket_state: Tensor | None = None, + n_shots: int = 0, + ) -> Tensor: + # reformat parameters + param_values = self._process_param_values(bra_param_values, ket_param_values) + + # determine bra and ket calculation functions + if n_shots == 0: + bra_calc_fn = getattr(self, "run") + ket_calc_fn = getattr(self.ket_model, "run", None) + else: + + def bra_calc_fn(values: dict, state: Tensor) -> Any: + return getattr(self, "sample")(values, n_shots, state) + + def ket_calc_fn(values: dict, state: Tensor) -> Any: + return getattr(self.ket_model, "sample", lambda *_: _)(values, n_shots, state) + + # calculate overlap + overlap = self.overlap_fn( + param_values, bra_calc_fn, bra_state, ket_calc_fn, ket_state # type: ignore [arg-type] + ) + + # reshape output if needed + if len(self.bra_feat_param_names.union(self.ket_feat_param_names)) < 2: + overlap = overlap[:, None] + else: + batch_size_bra = max(len(list(bra_param_values.values())[0]), 1) + batch_size_ket = max(len(list(ket_param_values.values())[0]), 1) + overlap = overlap.reshape((batch_size_ket, batch_size_bra)).t() + + return overlap diff --git a/qadence/py.typed b/qadence/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/qadence/qubit_support.py b/qadence/qubit_support.py new file mode 100644 index 00000000..b7a0f0fd --- /dev/null +++ b/qadence/qubit_support.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from typing import Any, Union + +from qadence.types import QubitSupportType + + +def _is_valid_support(t: Any) -> bool: + return isinstance(t, tuple) and all(i >= 0 for i in t) + + +class QubitSupport(tuple): + def __new__(cls, *support: Union[QubitSupportType, str, int, tuple]) -> QubitSupport: + if len(support) == 1: + if isinstance(support[0], tuple): + return QubitSupport(*support[0]) + if support[0] == "global": + support = (QubitSupportType.GLOBAL,) + valid = True + elif support[0] >= 0: # type: ignore[operator] + valid = True + else: + valid = False + else: + valid = _is_valid_support(support) + + if not valid: + raise ValueError( + "QubitSupport can be a tuple of ints or 'global'. For example:ℕn" + "QubitSupport(1,2,3) or QubitSupport('global')\n" + f"Found: {support}" + ) + return super(QubitSupport, cls).__new__(cls, support) # type: ignore[arg-type] + + def __add__(self, other: Any) -> QubitSupport: + if not isinstance(other, tuple): + raise ValueError(f"Cannot add type '{type(other)}' to QubitSupport.") + if self == other: + return self + elif self == ("global",): + return QubitSupport(*range(max(other) + 1)) if len(other) else QubitSupport("global") + elif other == ("global",): + return QubitSupport(*range(max(self) + 1)) if len(self) else QubitSupport("global") + else: + return QubitSupport(tuple({*self, *other})) + + def __radd__(self, other: Any) -> QubitSupport: + return self.__add__(other) + + @property + def is_global(self) -> bool: + return self == ("global",) + + def is_disjoint(self, other: Any) -> bool: + oth = QubitSupport(other) + if self.is_global or oth.is_global: + return False + else: + selfsup = set(self) + othersup = set(oth) + return selfsup.isdisjoint(othersup) diff --git a/qadence/register.py b/qadence/register.py new file mode 100644 index 00000000..ec0b2447 --- /dev/null +++ b/qadence/register.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from copy import deepcopy +from typing import Any + +import matplotlib.pyplot as plt +import networkx as nx +import numpy as np +from deepdiff import DeepDiff +from networkx.classes.reportviews import EdgeView, NodeView + +from qadence.types import LatticeTopology + +# Modules to be automatically added to the qadence namespace +__all__ = ["Register"] + + +def _scale_node_positions(graph: nx.Graph, scale: float) -> None: + scaled_nodes = {} + for k, node in graph.nodes.items(): + (x, y) = node["pos"] + scaled_nodes[k] = {"pos": (x * scale, y * scale)} + nx.set_node_attributes(graph, scaled_nodes) + + +class Register: + def __init__(self, support: nx.Graph | int): + """A 2D register of qubits which includes their coordinates (needed for e.g. analog + computing). The coordinates are ignored in backends that don't need them. The easiest + way to construct a register is via its classmethods like `Register.triangular_lattice`. + + Arguments: + support: A graph or number of qubits. Nodes can include a `"pos"` attribute + such that e.g.: `graph.nodes = {0: {"pos": (2,3)}, 1: {"pos": (0,0)}, ...}` which + will be used in backends that need qubit coordinates. + See the classmethods for simple construction of some predefined lattices if you + don't want to build a graph manually. + If you pass an integer the resulting register is the same as + `Register.all_to_all(n_qubits)`. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence import Register + + reg = Register.honeycomb_lattice(2,3) + reg.draw() + ``` + """ + self.graph = support if isinstance(support, nx.Graph) else alltoall_graph(support) + + @property + def n_qubits(self) -> int: + return len(self.graph) + + @classmethod + def from_coordinates( + cls, coords: list[tuple], lattice: LatticeTopology | str = LatticeTopology.ARBITRARY + ) -> Register: + graph = nx.Graph() + for i, pos in enumerate(coords): + graph.add_node(i, pos=pos) + return cls(graph) + + @classmethod + def line(cls, n_qubits: int) -> Register: + return cls(line_graph(n_qubits)) + + @classmethod + def circle(cls, n_qubits: int, scale: float = 1.0) -> Register: + graph = nx.grid_2d_graph(n_qubits, 1, periodic=True) + graph = nx.relabel_nodes(graph, {(i, 0): i for i in range(n_qubits)}) + coords = nx.circular_layout(graph) + values = {i: {"pos": pos} for i, pos in coords.items()} + nx.set_node_attributes(graph, values) + _scale_node_positions(graph, scale) + return cls(graph) + + @classmethod + def square(cls, qubits_side: int, scale: float = 1.0) -> Register: + n_points = 4 * (qubits_side - 1) + + def gen_points() -> np.ndarray: + rotate_left = np.array([[0.0, -1.0], [1.0, 0.0]]) + increment = np.array([0.0, 1.0]) + + points = [np.array([0.0, 0.0])] + counter = 1 + while len(points) < n_points: + points.append(points[-1] + increment) + + counter = (counter + 1) % qubits_side + if counter == 0: + increment = rotate_left.dot(increment) + counter = 1 + points = np.array(points) # type: ignore[assignment] + points -= np.mean(points, axis=0) + + return points # type: ignore[return-value] + + graph = nx.grid_2d_graph(n_points, 1, periodic=True) + graph = nx.relabel_nodes(graph, {(i, 0): i for i in range(n_points)}) + values = {i: {"pos": point} for i, point in zip(graph.nodes, gen_points())} + nx.set_node_attributes(graph, values) + _scale_node_positions(graph, scale) + return cls(graph) + + @classmethod + def all_to_all(cls, n_qubits: int) -> Register: + return cls(alltoall_graph(n_qubits)) + + @classmethod + def rectangular_lattice( + cls, qubits_row: int, qubits_col: int, side_length: float = 1.0 + ) -> Register: + graph = nx.grid_2d_graph(qubits_col, qubits_row) + values = {i: {"pos": node} for (i, node) in enumerate(graph.nodes)} + graph = nx.relabel_nodes(graph, {(i, j): k for k, (i, j) in enumerate(graph.nodes)}) + nx.set_node_attributes(graph, values) + _scale_node_positions(graph, side_length) + return cls(graph) + + @classmethod + def triangular_lattice( + cls, n_cells_row: int, n_cells_col: int, side_length: float = 1.0 + ) -> Register: + return cls(triangular_lattice_graph(n_cells_row, n_cells_col, side_length)) + + @classmethod + def honeycomb_lattice(cls, n_cells_row: int, n_cells_col: int, scale: float = 1.0) -> Register: + graph = nx.hexagonal_lattice_graph(n_cells_row, n_cells_col) + graph = nx.relabel_nodes(graph, {(i, j): k for k, (i, j) in enumerate(graph.nodes)}) + _scale_node_positions(graph, scale) + return cls(graph) + + @classmethod + def lattice(cls, topology: LatticeTopology | str, *args: Any, **kwargs: Any) -> Register: + return getattr(cls, topology)(*args, **kwargs) # type: ignore[no-any-return] + + def draw(self, show: bool = True) -> None: + coords = {i: n["pos"] for i, n in self.graph.nodes.items()} + nx.draw(self.graph, with_labels=True, pos=coords) + if show: + plt.gcf().show() + + def __getitem__(self, item: int) -> Any: + return self.graph.nodes[item] + + @property + def support(self) -> set: + return set(self.graph.nodes) + + @property + def coords(self) -> dict: + return {i: tuple(node.get("pos", ())) for i, node in self.graph.nodes.items()} + + @property + def edges(self) -> EdgeView: + return self.graph.edges + + @property + def nodes(self) -> NodeView: + return self.graph.nodes + + def _scale_positions(self, scale: float) -> Register: + g = deepcopy(self.graph) + _scale_node_positions(g, scale) + return Register(g) + + def _to_dict(self) -> dict: + return {"graph": nx.node_link_data(self.graph)} + + @classmethod + def _from_dict(cls, d: dict) -> Register: + return cls(nx.node_link_graph(d["graph"])) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Register): + return False + return ( + DeepDiff(self.coords, other.coords, ignore_order=False) == {} + and nx.is_isomorphic(self.graph, other.graph) + and self.n_qubits == other.n_qubits + ) + + +def line_graph(n_qubits: int, spacing: float = 1.0) -> nx.Graph: + """Create graph representing linear lattice. + + Args: + n_qubits (int): number of nodes in the graph + + Returns: + graph instance + """ + graph = nx.Graph() + for i in range(n_qubits): + graph.add_node(i, pos=(i * spacing, 0.0)) + for i, j in zip(range(n_qubits - 1), range(1, n_qubits)): + graph.add_edge(i, j) + return graph + + +def triangular_lattice_graph( + n_cells_row: int, n_cells_col: int, side_length: float = 1.0 +) -> nx.Graph: + graph = nx.triangular_lattice_graph(n_cells_row, n_cells_col) + graph = nx.relabel_nodes(graph, {(i, j): k for k, (i, j) in enumerate(graph.nodes)}) + _scale_node_positions(graph, side_length) + return graph + + +def alltoall_graph(n_qubits: int) -> nx.Graph: + if n_qubits == 2: + return line_graph(2) + elif n_qubits == 3: + return triangular_lattice_graph(1, 1) + + graph = nx.complete_graph(n_qubits) + # set seed to make sure the produced graphs are reproducible + coords = nx.spring_layout(graph, seed=0) + for i, pos in coords.items(): + graph.nodes[i]["pos"] = tuple(pos) + return graph diff --git a/qadence/serialization.py b/qadence/serialization.py new file mode 100644 index 00000000..b67ec5e5 --- /dev/null +++ b/qadence/serialization.py @@ -0,0 +1,352 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any, get_args +from typing import Union as TypingUnion + +import torch +from sympy import * +from sympy import Basic, Expr, srepr + +from qadence import QuantumCircuit, operations +from qadence import blocks as qadenceblocks +from qadence.blocks import AbstractBlock +from qadence.blocks.utils import tag +from qadence.logger import get_logger +from qadence.ml_tools.models import TransformedModule +from qadence.models import QNN, QuantumModel +from qadence.parameters import Parameter +from qadence.register import Register +from qadence.types import SerializationFormat + +# Modules to be automatically added to the qadence namespace +__all__ = ["deserialize", "load", "save", "serialize"] + + +logger = get_logger(__name__) + + +def file_extension(file: Path | str) -> str: + FORMAT = "" + if isinstance(file, str): + _, extension = os.path.splitext(file) + FORMAT = extension[1:].upper() + elif isinstance(file, os.PathLike): + _, extension = os.path.splitext(str(file)) + FORMAT = extension[1:].upper() + return FORMAT + + +SUPPORTED_OBJECTS = [ + AbstractBlock, + QuantumCircuit, + QuantumModel, + QNN, + TransformedModule, + Register, + Basic, + torch.nn.Module, +] +SUPPORTED_TYPES = TypingUnion[ + AbstractBlock, + QuantumCircuit, + QuantumModel, + QNN, + TransformedModule, + Register, + Basic, + torch.nn.Module, +] + + +ALL_BLOCK_NAMES = [ + n for n in dir(qadenceblocks) if not (n.startswith("__") and n.endswith("__")) +] + [n for n in dir(operations) if not (n.startswith("__") and n.endswith("__"))] + + +def save_pt(d: dict, file_path: str | Path) -> None: + torch.save(d, file_path) + + +def save_json(d: dict, file_path: str | Path) -> None: + with open(file_path, "w") as file: + file.write(json.dumps(d)) + + +def load_pt(file_path: str | Path, map_location: str) -> Any: + return torch.load(file_path, map_location=map_location) + + +def load_json(file_path: str | Path, map_location: str) -> Any: + with open(file_path, "r") as file: + return json.load(file) + + +FORMAT_DICT = { + SerializationFormat.PT: (".pt", save_pt, load_pt, True), + SerializationFormat.JSON: (".json", save_json, load_json, False), +} + + +def serialize(obj: SUPPORTED_TYPES, save_params: bool = False) -> dict: + """ + Supported Types: + AbstractBlock | QuantumCircuit | QuantumModel | TransformedModule | Register | Module + Serializes a qadence object to a dictionary. + + Arguments: + obj (AbstractBlock | QuantumCircuit | QuantumModel | Register | Module): + Returns: + A dict. + + Examples: + ```python exec="on" source="material-block" result="json" + import torch + from qadence import serialize, deserialize, hea, total_magnetization + from qadence import QuantumCircuit, QuantumModel + + n_qubits = 2 + myblock = hea(n_qubits=n_qubits, depth=1) + block_dict = serialize(myblock) + print(block_dict) + + ## Lets use myblock in a QuantumCircuit and serialize it. + + qc = QuantumCircuit(n_qubits, myblock) + qc_dict = serialize(qc) + qc_deserialized = deserialize(qc_dict) + assert qc == qc_deserialized + + ## Finally, let's wrap it in a QuantumModel + obs = total_magnetization(n_qubits) + qm = QuantumModel(qc, obs, backend='pyqtorch', diff_mode='ad') + + qm_dict = serialize(qm) + qm_deserialized = deserialize(qm_dict) + # Lets check if the loaded QuantumModel returns the same expectation + assert torch.isclose(qm.expectation({}), qm_deserialized.expectation({})) + ``` + """ + if not isinstance(obj, get_args(SUPPORTED_TYPES)): + logger.error(TypeError(f"Serialization of object type {type(obj)} not supported.")) + d: dict = {} + try: + if isinstance(obj, Expr): + symb_dict = {} + expr_dict = {"name": str(obj), "expression": srepr(obj)} + symbs: set[Parameter | Basic] = obj.free_symbols + if symbs: + symb_dict = {"symbols": {str(s): s._to_dict() for s in symbs}} + d = {**expr_dict, **symb_dict} + elif isinstance(obj, (QuantumModel, QNN, TransformedModule)): + d = obj._to_dict(save_params) + elif isinstance(obj, torch.nn.Module): + d = {type(obj).__name__: obj.state_dict()} + else: + d = obj._to_dict() + except Exception as e: + logger.error(f"Serialization of object {obj} failed due to {e}") + return d + + +def deserialize(d: dict, as_torch: bool = False) -> SUPPORTED_TYPES: + """ + Supported Types: + AbstractBlock | QuantumCircuit | QuantumModel | TransformedModule | Register | Module + Deserializes a dict to one of the supported types. + + Arguments: + d (dict): A dict containing a serialized object. + Returns: + AbstractBlock, QuantumCircuit, QuantumModel, TransformedModule, Register, Module. + + Examples: + ```python exec="on" source="material-block" result="json" + import torch + from qadence import serialize, deserialize, hea, total_magnetization + from qadence import QuantumCircuit, QuantumModel + + n_qubits = 2 + myblock = hea(n_qubits=n_qubits, depth=1) + block_dict = serialize(myblock) + print(block_dict) + + ## Lets use myblock in a QuantumCircuit and serialize it. + + qc = QuantumCircuit(n_qubits, myblock) + qc_dict = serialize(qc) + qc_deserialized = deserialize(qc_dict) + assert qc == qc_deserialized + + ## Finally, let's wrap it in a QuantumModel + obs = total_magnetization(n_qubits) + qm = QuantumModel(qc, obs, backend='pyqtorch', diff_mode='ad') + + qm_dict = serialize(qm) + qm_deserialized = deserialize(qm_dict) + # Lets check if the loaded QuantumModel returns the same expectation + assert torch.isclose(qm.expectation({}), qm_deserialized.expectation({})) + ``` + """ + obj: Any + if d.get("expression"): + expr = eval(d["expression"]) + if hasattr(expr, "free_symbols"): + for symb in expr.free_symbols: + symb.value = float(d["symbols"][symb.name]["value"]) + obj = expr + elif d.get("QuantumModel"): + obj = QuantumModel._from_dict(d, as_torch) + elif d.get("QNN"): + obj = QNN._from_dict(d, as_torch) + elif d.get("TransformedModule"): + obj = TransformedModule._from_dict(d, as_torch) + elif d.get("block") and d.get("register"): + obj = QuantumCircuit._from_dict(d) + elif d.get("graph"): + obj = Register._from_dict(d) + elif d.get("type"): + if d["type"] in ALL_BLOCK_NAMES: + block: AbstractBlock = ( + getattr(operations, d["type"])._from_dict(d) + if hasattr(operations, d["type"]) + else getattr(qadenceblocks, d["type"])._from_dict(d) + ) + if d["tag"] is not None: + block = tag(block, d["tag"]) + obj = block + else: + import warnings + + msg = warnings.warn( + "In order to load a custom torch.nn.Module, make sure its imported in the namespace." + ) + try: + module_name = list(d.keys())[0] + obj = getattr(globals(), module_name) + obj.load_state_dict(d[module_name]) + except Exception as e: + logger.error( + TypeError( + f"{msg}. Unable to deserialize object due to {e}.\ + Supported objects are: {SUPPORTED_OBJECTS}" + ) + ) + return obj + + +def save( + obj: SUPPORTED_TYPES, + folder: str | Path, + file_name: str = "", + format: SerializationFormat = SerializationFormat.JSON, +) -> None: + """ + Same as serialize/deserialize but for storing/loading files. + Supported types: + AbstractBlock | QuantumCircuit | QuantumModel | TransformedModule | Register | torch.nn.Module + Saves a qadence object to a json/.pt. + + Arguments: + obj (AbstractBlock | QuantumCircuit | QuantumModel | Register): + Either AbstractBlock, QuantumCircuit, QuantumModel, TransformedModule, Register. + file_name (str): The name of the file. + format (str): The type of file to save. + Returns: + None. + + Examples: + ```python exec="on" source="material-block" result="json" + import torch + from pathlib import Path + import os + + from qadence import save, load, hea, total_magnetization + from qadence import QuantumCircuit, QuantumModel + + n_qubits = 2 + myblock = hea(n_qubits=n_qubits, depth=1) + qc = QuantumCircuit(n_qubits, myblock) + # Lets store the circuit in a json file + save(qc, '.', 'circ') + loaded_qc = load(Path('circ.json')) + qc == loaded_qc + os.remove('circ.json') + ## Let's wrap it in a QuantumModel and store that + obs = total_magnetization(n_qubits) + qm = QuantumModel(qc, obs, backend='pyqtorch', diff_mode='ad') + save(qm, folder= '.',file_name= 'quantum_model') + qm_loaded = load('quantum_model.json') + os.remove('quantum_model.json') + ``` + """ + if not isinstance(obj, get_args(SUPPORTED_TYPES)): + logger.error(f"Serialization of object type {type(obj)} not supported.") + folder = Path(folder) + if not folder.is_dir(): + logger.error(NotADirectoryError) + if file_name == "": + file_name = type(obj).__name__ + try: + suffix, save_fn, _, save_params = FORMAT_DICT[format] + d = serialize(obj, save_params) + file_path = folder / Path(file_name + suffix) + save_fn(d, file_path) + logger.debug(f"Successfully saved {obj} from to {folder}.") + except Exception as e: + logger.error(f"Unable to write {type(obj)} to disk due to {e}") + + +def load(file_path: str | Path, map_location: str = "cpu") -> SUPPORTED_TYPES: + """ + Same as serialize/deserialize but for storing/loading files. + Supported types: AbstractBlock | QuantumCircuit | QuantumModel | TransformedModule | Register + Loads a .json or .pt file to one of the supported types. + + Arguments: + file_path (str): The name of the file. + map_location (str): In case of a .pt file, on which device to load the object (cpu,cuda). + Returns: + A object of type AbstractBlock, QuantumCircuit, QuantumModel, TransformedModule, Register. + + Examples: + ```python exec="on" source="material-block" result="json" + import torch + from pathlib import Path + import os + + from qadence import save, load, hea, total_magnetization + from qadence import QuantumCircuit, QuantumModel + + n_qubits = 2 + myblock = hea(n_qubits=n_qubits, depth=1) + qc = QuantumCircuit(n_qubits, myblock) + # Lets store the circuit in a json file + save(qc, '.', 'circ') + loaded_qc = load(Path('circ.json')) + qc == loaded_qc + os.remove('circ.json') + ## Let's wrap it in a QuantumModel and store that + obs = total_magnetization(n_qubits) + qm = QuantumModel(qc, obs, backend='pyqtorch', diff_mode='ad') + save(qm, folder= '.',file_name= 'quantum_model') + qm_loaded = load('quantum_model.json') + os.remove('quantum_model.json') + ``` + """ + d = {} + if isinstance(file_path, str): + file_path = Path(file_path) + if not os.path.exists(file_path): + logger.error(f"File {file_path} not found.") + raise FileNotFoundError + FORMAT = file_extension(file_path) + _, _, load_fn, _ = FORMAT_DICT[FORMAT] # type: ignore[index] + try: + d = load_fn(file_path, map_location) + logger.debug(f"Successfully loaded {d} from {file_path}.") + except Exception as e: + logger.error(f"Unable to load Object from {file_path} due to {e}") + return deserialize(d) diff --git a/qadence/states.py b/qadence/states.py new file mode 100644 index 00000000..89982a30 --- /dev/null +++ b/qadence/states.py @@ -0,0 +1,557 @@ +from __future__ import annotations + +import random +from functools import singledispatch +from typing import List + +import torch +from torch import Tensor, concat +from torch.distributions import Categorical, Distribution + +from qadence import BackendName +from qadence.backends.api import backend_factory +from qadence.blocks import ChainBlock, KronBlock, PrimitiveBlock, chain, kron +from qadence.circuit import QuantumCircuit +from qadence.operations import CNOT, RX, RY, RZ, H, I, X +from qadence.overlap import fidelity +from qadence.types import Endianness, StateGeneratorType +from qadence.utils import basis_to_int + +# Modules to be automatically added to the qadence namespace +__all__ = [ + "uniform_state", + "zero_state", + "one_state", + "product_state", + "rand_product_state", + "ghz_state", + "random_state", + "uniform_block", + "one_block", + "zero_block", + "product_block", + "rand_product_block", + "ghz_block", + "pmf", + "normalize", + "is_normalized", + "rand_bitstring", + "equivalent_state", +] + +ATOL_64 = 1e-14 # 64 bit precision +NORMALIZATION_ATOL = ATOL_64 +DTYPE = torch.cdouble + +parametric_single_qubit_gates: List = [RX, RY, RZ] + +# PRIVATE + + +def _rand_haar_fast(n_qubits: int) -> Tensor: + # inspired by https://qiskit.org/documentation/_modules/qiskit/quantum_info/states/random.html#random_statevector + N = 2**n_qubits + x = -torch.log(torch.rand(N)) + sumx = torch.sum(x) + phases = torch.rand(N) * 2.0 * torch.pi + return (torch.sqrt(x / sumx) * torch.exp(1j * phases)).reshape(1, N) + + +def _rand_haar_slow(n_qubits: int) -> Tensor: + """ + Detailed in https://arxiv.org/pdf/math-ph/0609050.pdf + + Textbook implementation, but very expensive. For 12 qubits it takes several seconds. + For 1 qubit it seems to produce the same distribution as the measure above. + """ + N = 2**n_qubits + A = torch.zeros(N, N, dtype=DTYPE).normal_(0, 1) + B = torch.zeros(N, N, dtype=DTYPE).normal_(0, 1) + Z = A + 1.0j * B + Q, R = torch.linalg.qr(Z) + Lambda = torch.diag(torch.diag(R) / torch.diag(R).abs()) + haar_unitary = torch.matmul(Q, Lambda) + return torch.matmul(haar_unitary, zero_state(n_qubits).squeeze(0)).unsqueeze(0) + + +@singledispatch +def _run_state(circ: QuantumCircuit, backend: str) -> Tensor: + if backend != BackendName.PYQTORCH: + raise ValueError("Only pyqtorch supports custom states.") + bknd = backend_factory(backend=backend, diff_mode="ad") + conv = bknd.convert(circ) + return bknd.run(conv.circuit, conv.embedding_fn(conv.params, {})) + + +@_run_state.register +def _(circs: list, backend: str) -> Tensor: # type: ignore[misc] + bknd = backend_factory(backend=backend, diff_mode="ad") + results = () + for c in circs: + conv = bknd.convert(c) + results += ( + bknd.run(conv.circuit, conv.embedding_fn(conv.params, {})), + ) # type:ignore[assignment] + return concat(results, dim=0) + + +def _from_op(op: type[PrimitiveBlock], n_qubits: int) -> KronBlock: + return kron(op(i) for i in range(n_qubits)) # type: ignore[arg-type] + + +def _block_from_bitstring(bitstring: str) -> KronBlock: + n_qubits = len(bitstring) + gates = [] + for i, b in zip(range(n_qubits), bitstring): + gates.append(X(i)) if b == "1" else gates.append(I(i)) # type: ignore[arg-type] + return kron(*gates) + + +def _state_from_bitstring( + bitstring: str, batch_size: int, endianness: Endianness = Endianness.BIG +) -> Tensor: + n_qubits = len(bitstring) + wf_batch = torch.zeros(batch_size, 2**n_qubits, dtype=DTYPE) + k = basis_to_int(basis=bitstring, endianness=endianness) + wf_batch[:, k] = torch.tensor(1.0 + 0j, dtype=DTYPE) + return wf_batch + + +def _abstract_random_state( + n_qubits: int, batch_size: int = 1 +) -> QuantumCircuit | list[QuantumCircuit]: + qc_list = [] + for i in range(batch_size): + gates_list = [] + for i in range(n_qubits): + gate = parametric_single_qubit_gates[ + random.randrange(len(parametric_single_qubit_gates)) + ] + angle = random.uniform(-2, 2) + gates_list.append(gate(i, angle)) + qc_list.append(QuantumCircuit(n_qubits, chain(*gates_list))) + return qc_list[0] if batch_size == 1 else qc_list + + +# STATES + + +def uniform_state(n_qubits: int, batch_size: int = 1) -> Tensor: + """ + Generates the uniform state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + batch_size (int): The batch size. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import uniform_state + + state = uniform_state(n_qubits=2) + print(state) + ``` + """ + norm = 1 / torch.sqrt(torch.tensor(2**n_qubits)) + return norm * torch.ones(batch_size, 2**n_qubits, dtype=DTYPE) + + +def zero_state(n_qubits: int, batch_size: int = 1) -> Tensor: + """ + Generates the zero state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits for which the zero state is to be generated. + batch_size (int): The batch size for the zero state. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import zero_state + + state = zero_state(n_qubits=2) + print(state) + ``` + """ + bitstring = "0" * n_qubits + return _state_from_bitstring(bitstring, batch_size) + + +def one_state(n_qubits: int, batch_size: int = 1) -> Tensor: + """ + Generates the one state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + batch_size (int): The batch size. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import one_state + + state = one_state(n_qubits=2) + print(state) + ``` + """ + bitstring = "1" * n_qubits + return _state_from_bitstring(bitstring, batch_size) + + +@singledispatch +def product_state( + bitstring: str, batch_size: int = 1, endianness: Endianness = Endianness.BIG +) -> Tensor: + """ + Creates a product state from a bitstring. + + Arguments: + bitstring (str): A bitstring. + batch_size (int) : Batch size. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import product_state + + print(product_state("1100")) + ``` + """ + return _state_from_bitstring(bitstring, batch_size, endianness=endianness) + + +@product_state.register +def _(bitstrings: list) -> Tensor: # type: ignore + return concat(tuple(product_state(b) for b in bitstrings), dim=0) + + +def rand_product_state(n_qubits: int, batch_size: int = 1) -> Tensor: + """ + Creates a random product state. + + Arguments: + n_qubits (int): The number of qubits. + batch_size (int): How many bitstrings to use. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import rand_product_state + + print(rand_product_state(n_qubits=2, batch_size=2)) + ``` + """ + wf_batch = torch.zeros(batch_size, 2**n_qubits, dtype=DTYPE) + rand_pos = torch.randint(0, 2**n_qubits, (batch_size,)) + wf_batch[torch.arange(batch_size), rand_pos] = torch.tensor(1.0 + 0j, dtype=DTYPE) + return wf_batch + + +def ghz_state(n_qubits: int, batch_size: int = 1) -> Tensor: + """ + Creates a GHZ state. + + Arguments: + n_qubits (int): The number of qubits. + batch_size (int): How many bitstrings to use. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import ghz_state + + print(ghz_state(n_qubits=2, batch_size=2)) + ``` + """ + norm = 1 / torch.sqrt(torch.tensor(2)) + return norm * (zero_state(n_qubits, batch_size) + one_state(n_qubits, batch_size)) + + +def random_state( + n_qubits: int, + batch_size: int = 1, + backend: str = BackendName.PYQTORCH, + type: StateGeneratorType = StateGeneratorType.HAAR_MEASURE_FAST, +) -> Tensor: + """ + Generates a random state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + backend (str): The backend to use. + batch_size (int): The batch size. + type : StateGeneratorType. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import random_state, StateGeneratorType + from qadence.states import random_state, is_normalized, pmf + from qadence.backend import BackendName + from torch.distributions import Distribution + + ### We have the following options: + print([g.value for g in StateGeneratorType]) + + + n_qubits = 2 + # The default is StateGeneratorType.HAARMEASUREFAST + state = random_state(n_qubits=n_qubits) + print(state) + + ### Lets initialize a state using random rotations, i.e., StateGeneratorType.RANDOM_ROTATIONS. + random = random_state(n_qubits=n_qubits, type=StateGeneratorType.RANDOM_ROTATIONS) + print(random) + ``` + """ + + if type == StateGeneratorType.HAAR_MEASURE_FAST: + state = concat(tuple(_rand_haar_fast(n_qubits) for _ in range(batch_size)), dim=0) + elif type == StateGeneratorType.HAAR_MEASURE_FAST: + state = concat(tuple(_rand_haar_slow(n_qubits) for _ in range(batch_size)), dim=0) + elif type == StateGeneratorType.RANDOM_ROTATIONS: + state = _run_state(_abstract_random_state(n_qubits, batch_size), backend) # type: ignore + assert all(list(map(is_normalized, state))) + return state + + +# BLOCKS + + +def uniform_block(n_qubits: int) -> KronBlock: + """ + Generates the abstract uniform state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + + Returns: + A KronBlock representing the uniform state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import uniform_block + + block = uniform_block(n_qubits=2) + print(block) + ``` + """ + return _from_op(H, n_qubits=n_qubits) + + +def one_block(n_qubits: int) -> KronBlock: + """ + Generates the abstract one state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + + Returns: + A KronBlock representing the one state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import one_block + + block = one_block(n_qubits=2) + print(block) + ``` + """ + return _from_op(X, n_qubits=n_qubits) + + +def zero_block(n_qubits: int) -> KronBlock: + """ + Generates the abstract zero state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + + Returns: + A KronBlock representing the zero state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import zero_block + + block = zero_block(n_qubits=2) + print(block) + ``` + """ + return _from_op(I, n_qubits=n_qubits) + + +def product_block(bitstring: str) -> KronBlock: + """ + Creates an abstract product state from a bitstring. + + Arguments: + bitstring (str): A bitstring. + + Returns: + A KronBlock representing the product state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import product_block + + print(product_block("1100")) + ``` + """ + return _block_from_bitstring(bitstring) + + +def rand_product_block(n_qubits: int) -> KronBlock: + """ + Creates a block representing a random abstract product state. + + Arguments: + n_qubits (int): The number of qubits. + + Returns: + A KronBlock representing the product state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import rand_product_block + + print(rand_product_block(n_qubits=2)) + ``` + """ + return product_block(rand_bitstring(n_qubits)) + + +def ghz_block(n_qubits: int) -> ChainBlock: + """ + Generates the abstract ghz state for a specified number of qubits. + + Arguments: + n_qubits (int): The number of qubits. + + Returns: + A ChainBlock representing the GHZ state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import ghz_block + + block = ghz_block(n_qubits=2) + print(block) + ``` + """ + cnots = chain(CNOT(i - 1, i) for i in range(1, n_qubits)) + return chain(H(0), cnots) + + +# UTILITIES + + +def pmf(wf: Tensor) -> Distribution: + """ + Converts a wave function into a torch Distribution. + + Arguments: + wf (torch.Tensor): The wave function as a torch tensor. + + Returns: + A torch.distributions.Distribution. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import uniform_state, pmf + + print(pmf(uniform_state(2)).probs) + ``` + """ + return Categorical(torch.abs(torch.pow(wf, 2))) + + +def normalize(wf: Tensor) -> Tensor: + """ + Normalizes a wavefunction or batch of wave functions. + + Arguments: + wf (torch.Tensor): Normalized wavefunctions. + + Returns: + A torch.Tensor. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import uniform_state, normalize + + print(normalize(uniform_state(2, 2))) + ``` + """ + if wf.dim() == 1: + return wf / torch.sqrt((wf.abs() ** 2).sum()) + else: + return wf / torch.sqrt((wf.abs() ** 2).sum(1)).unsqueeze(1) + + +def is_normalized(wf: Tensor, atol: float = NORMALIZATION_ATOL) -> bool: + """ + Checks if a wave function is normalized. + + Arguments: + wf (torch.Tensor): The wave function as a torch tensor. + atol (float) : The tolerance. + + Returns: + A bool. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import uniform_state, is_normalized + + print(is_normalized(uniform_state(2))) + ``` + """ + if wf.dim() == 1: + wf = wf.unsqueeze(0) + sum_probs: Tensor = (wf.abs() ** 2).sum(dim=1) + ones = torch.ones_like(sum_probs) + return torch.allclose(sum_probs, ones, rtol=0.0, atol=atol) # type: ignore[no-any-return] + + +def rand_bitstring(N: int) -> str: + """ + Creates a random bistring. + + Arguments: + N (int): The length of the bitstring. + + Returns: + A string. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.states import rand_bitstring + + print(rand_bitstring(N=8)) + ``` + """ + return "".join(str(random.randint(0, 1)) for _ in range(N)) + + +def equivalent_state( + s0: torch.Tensor, s1: torch.Tensor, rtol: float = 0.0, atol: float = NORMALIZATION_ATOL +) -> bool: + fid = fidelity(s0, s1) + expected = torch.ones_like(fid) + return torch.allclose(fid, expected, rtol=rtol, atol=atol) # type: ignore[no-any-return] diff --git a/qadence/types.py b/qadence/types.py new file mode 100644 index 00000000..b50e75f5 --- /dev/null +++ b/qadence/types.py @@ -0,0 +1,345 @@ +from __future__ import annotations + +import importlib +from enum import Enum +from typing import Iterable, Tuple, Union + +import numpy as np +import sympy +import torch + +TNumber = Union[int, float, complex] +"""Union of python number types.""" + +TDrawColor = Tuple[float, float, float, float] + +TParameter = Union[TNumber, torch.Tensor, sympy.Basic, str] +"""Union of numbers, tensors, and parameter types.""" + +TArray = Union[Iterable, torch.Tensor, np.ndarray] +"""Union of common array types.""" + +TGenerator = Union[torch.Tensor, sympy.Array, sympy.Basic] +"""Union of torch tensors and numpy arrays.""" + +PI = torch.pi + +# Modules to be automatically added to the qadence namespace +__all__ = [ + "Endianness", + "Strategy", + "ResultType", + "ParameterType", + "BackendName", + "StateGeneratorType", + "LTSOrder", + "TensorType", + "DiffMode", + "BackendName", + "Interaction", + "OverlapMethod", + "AlgoHEvo", + "SerializationFormat", +] # type: ignore + + +class StrEnum(str, Enum): + def __str__(self) -> str: + """Used when dumping enum fields in a schema.""" + ret: str = self.value + return ret + + @classmethod + def list(cls) -> list[str]: + return list(map(lambda c: c.value, cls)) # type: ignore + + +class Strategy(StrEnum): + """Computing paradigm.""" + + DIGITAL = "Digital" + """Use the digital paradigm.""" + ANALOG = "Analog" + """Use the analog paradigm.""" + SDAQC = "sDAQC" + """Use the step-wise digital-analog QC paradigm.""" + BDAQC = "bDAQC" + """Use the banged digital-analog QC paradigm.""" + + +class Endianness(StrEnum): + """The endianness convention to use.""" + + BIG = "Big" + """Use Big endianness.""" + LITTLE = "Little" + """Use little endianness.""" + + +class ResultType(StrEnum): + """Available data types for generating certain results.""" + + STRING = "String" + """String Type.""" + TORCH = "Torch" + """Torch Tensor Type.""" + NUMPY = "Numpy" + """Numpy Array Type.""" + + +class ParameterType(StrEnum): + """Parameter types available in qadence.""" + + FEATURE = "Feature" + """FeatureParameters act as input and are not trainable.""" + VARIATIONAL = "Variational" + """VariationalParameters are trainable.""" + FIXED = "Fixed" + """Fixed/ constant parameters are neither trainable nor act as input.""" + + +class TensorType(StrEnum): + """Tensor Types for converting blocks to tensors.""" + + SPARSEDIAGONAL = "SparseDiagonal" + """Convert a diagonal observable block to a sparse diagonal if possible.""" + DENSE = "Dense" + """Convert a block to a dense tensor.""" + SPARSE = "Sparse" + """Convert a observable block to a sparse tensor.""" + + +class LTSOrder(StrEnum): + """ + Lie-Trotter-Suzuki approximation order. + """ + + BASIC = "BASIC" + """Basic.""" + ST2 = "ST2" + """ST2.""" + ST4 = "ST4" + """ST4.""" + + +class _DiffMode(StrEnum): + """Differentiation modes to choose from.""" + + GPSR = "gpsr" + """Basic generalized parameter shift rule.""" + AD = "ad" + """Automatic Differentiation.""" + + +class QubitSupportType(StrEnum): + """Qubit support types.""" + + GLOBAL = "global" + """Use global qubit support.""" + + +class Interaction(StrEnum): + """Interaction types used in + - [`add_interaction`][qadence.transpile.emulate.add_interaction]. + - [`hamiltonian_factory`][qadence.constructors.hamiltonians.hamiltonian_factory]. + """ + + ZZ = "ZZ" + """ZZ-Ising Interaction""" + NN = "NN" + """NN-Ising Interaction, N=(I-Z)/2""" + XY = "XY" + """XY Interaction""" + XYZ = "XYZ" + """XYZ Interaction""" + + +class _BackendName(StrEnum): + """The available backends for running circuits.""" + + PYQTORCH = "pyqtorch" + """The Pyqtorch backend.""" + BRAKET = "braket" + """The Braket backend.""" + PULSER = "pulser" + """The Pulser backend.""" + + +# If proprietary qadence_extensions is available, import the +# right function since more backends are supported. +try: + module = importlib.import_module("qadence_extensions.types") + BackendName = getattr(module, "BackendName") + DiffMode = getattr(module, "DiffMode") +except ModuleNotFoundError: + BackendName = _BackendName + DiffMode = _DiffMode + + +class StateGeneratorType(StrEnum): + """Methods to generate random states.""" + + RANDOM_ROTATIONS = "RandomRotations" + """Random Rotations.""" + HAAR_MEASURE_FAST = "HaarMeasureFast" + """HaarMeasure.""" + HAAR_MEASURE_SLOW = "HaarMeasureSlow" + """HaarMeasure non-optimized version.""" + + +class SerializationFormat(StrEnum): + """Available serialization formats for circuits.""" + + PT = "PT" + """The PT format used by Torch.""" + JSON = "JSON" + """The Json format.""" + + +class OverlapMethod(StrEnum): + """Overlap Methods to choose from.""" + + EXACT = "exact" + """Exact.""" + JENSEN_SHANNON = "jensen_shannon" + """Jensen-shannon.""" + COMPUTE_UNCOMPUTE = "compute_uncompute" + """Compute-uncompute.""" + SWAP_TEST = "swap_test" + """Swap-test.""" + HADAMARD_TEST = "hadamard_test" + """Hadamard-test.""" + + +class FigFormat(StrEnum): + """Available output formats for exporting visualized circuits to a file.""" + + PNG = "PNG" + """PNG format.""" + PDF = "PDF" + """PDF format.""" + SVG = "SVG" + """SVG format.""" + + +class AlgoHEvo(StrEnum): + """Hamiltonian Evolution algorithms that can be used by the backend.""" + + RK4 = "RK4" + """4th order Runge-Kutta approximation.""" + EIG = "EIG" + """Using Hamiltonian diagonalization.""" + EXP = "EXP" + """Using torch.matrix_exp on the generator matrix.""" + + +class LatticeTopology(StrEnum): + """Lattice topologies to choose from for the register.""" + + LINE = "line" + """Line-format lattice.""" + SQUARE = "square" + """Square lattice.""" + CIRCLE = "circle" + """Circular lattice.""" + ALL_TO_ALL = "all_to_all" + """All to all- connected lattice.""" + RECTANGULAR_LATTICE = "rectangular_lattice" + """Rectangular-shaped lattice.""" + TRIANGULAR_LATTICE = "triangular_lattice" + """Triangular-shaped shape.""" + HONEYCOMB_LATTICE = "honeycomb_lattice" + """Honeycomb-shaped lattice.""" + ARBITRARY = "arbitrary" + """Arbitrarily-shaped lattice.""" + + +class GenDAQC(StrEnum): + """The type of interaction for the DAQC transform.""" + + ZZ = "ZZ" + """ZZ""" + NN = "NN" + """NN""" + + +class OpName(StrEnum): + """A list of all available of digital-analog operations.""" + + # Digital operations + X = "X" + """The X gate.""" + Y = "Y" + """The Y gate.""" + Z = "Z" + """The Z gate.""" + N = "N" + """The N = (1/2)(I-Z) operator""" + H = "H" + """The Hadamard gate.""" + I = "I" # noqa + """The Identity gate.""" + ZERO = "Zero" + """The zero gate.""" + RX = "RX" + """The RX gate.""" + RY = "RY" + """The RY gate.""" + RZ = "RZ" + """The RZ gate.""" + U = "U" + """The U gate.""" + CNOT = "CNOT" + """The CNOT gate.""" + CZ = "CZ" + """The CZ gate.""" + MCZ = "MCZ" + """The Multicontrol CZ gate.""" + HAMEVO = "HamEvo" + """The Hamiltonian Evolution operation.""" + CRX = "CRX" + """The Control RX gate.""" + MCRX = "MCRX" + """The Multicontrol RX gate.""" + CRY = "CRY" + """The Controlled RY gate.""" + MCRY = "MCRY" + """The Multicontrol RY gate.""" + CRZ = "CRZ" + """The Control RZ gate.""" + MCRZ = "MCRZ" + """The Multicontrol RZ gate.""" + CSWAP = "CSWAP" + """The Control SWAP gate.""" + T = "T" + """The T gate.""" + # FIXME: Tdagger is not currently supported by any backend + TDAGGER = "TDagger" + """The T dagger gate.""" + S = "S" + """The S gate.""" + SDAGGER = "SDagger" + """The S dagger gate.""" + SWAP = "SWAP" + """The SWAP gate.""" + PHASE = "PHASE" + """The PHASE gate.""" + CPHASE = "CPHASE" + """The controlled PHASE gate.""" + MCPHASE = "MCPHASE" + """The Multicontrol PHASE gate.""" + TOFFOLI = "Toffoli" + """The Toffoli gate.""" + # Analog operations + ANALOGENTANG = "AnalogEntanglement" + """The analog entanglement operation.""" + ANALOGRX = "AnalogRX" + """The analog RX operation.""" + ANALOGRY = "AnalogRY" + """The analog RY operation.""" + ANALOGSWAP = "AnalogSWAP" + """The analog SWAP operation.""" + ENTANG = "entangle" + """The entanglement operation.""" + WAIT = "wait" + """The wait operation.""" diff --git a/qadence/utils.py b/qadence/utils.py new file mode 100644 index 00000000..1b43a0a9 --- /dev/null +++ b/qadence/utils.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import math +import warnings +from collections import Counter +from typing import Any + +import numpy as np +import sympy +import torch +from scipy.sparse.linalg import eigs +from torch.linalg import eigvals + +from qadence.logger import get_logger +from qadence.types import Endianness, ResultType, TNumber + +# Modules to be automatically added to the qadence namespace +__all__ = [] # type: ignore + + +logger = get_logger(__name__) + + +def bitstring_to_int(bstring: str, endianness: Endianness = Endianness.BIG) -> int: + # FIXME: Remove in v1.0.0 + warnings.warn("Deprecated function bitstring_to_int. Please use basis_to_int.", FutureWarning) + return basis_to_int(bstring, endianness) + + +def basis_to_int(basis: str, endianness: Endianness = Endianness.BIG) -> int: + """ + Converts a computational basis state to an int. + + - `endianness = "Big"` reads the most significant bit in qubit 0 (leftmost). + - `endianness = "Little"` reads the least significant bit in qubit 0 (leftmost). + + Arguments: + basis (str): A computational basis state. + endianness (Endianness): The Endianness when reading the basis state. + + Returns: + The corresponding integer. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.utils import basis_to_int, Endianness + + k = basis_to_int(basis="10", endianness=Endianness.BIG) + print(k) + ``` + """ + if endianness == Endianness.BIG: + return int(basis, 2) + else: + return int(basis[::-1], 2) + + +def int_to_basis( + k: int, n_qubits: int | None = None, endianness: Endianness = Endianness.BIG +) -> str: + """ + Converts an integer to its corresponding basis state. + + - `endianness = "Big"` stores the most significant bit in qubit 0 (leftmost). + - `endianness = "Little"` stores the least significant bit in qubit 0 (leftmost). + + Arguments: + k (int): The int to convert. + n_qubits (int): The total number of qubits in the basis state. + endianness (Endianness): The Endianness of the resulting basis state. + + Returns: + A computational basis state. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.utils import int_to_basis, Endianness + + bs = int_to_basis(k=1, n_qubits=2, endianness=Endianness.BIG) + print(bs) + ``` + """ + if n_qubits is None: + n_qubits = int(math.log(k + 0.6) / math.log(2)) + 1 + assert k <= 2**n_qubits - 1, "k can not be larger than 2**n_qubits-1." + basis = format(k, "0{}b".format(n_qubits)) + if endianness == Endianness.BIG: + return basis + else: + return basis[::-1] + + +def nqubits_to_basis( + n_qubits: int, + result_type: ResultType = ResultType.STRING, + endianness: Endianness = Endianness.BIG, +) -> list[str] | torch.Tensor | np.array: + """ + Creates all basis states for a given number of qubits, endianness and format. + + Arguments: + n_qubits: The total number of qubits. + result_type: The data type of the resulting states. + endianness: The Endianness of the resulting states. + + Returns: + The full computational basis for n_qubits. + + Examples: + ```python exec="on" source="material-block" result="json" + from qadence.utils import nqubits_to_basis, Endianness, ResultType + basis_type = ResultType.Torch + bs = nqubits_to_basis(n_qubits=2, result_type= basis_type, endianness=Endianness.BIG) + print(bs) + ``` + """ + basis_strings = [int_to_basis(k, n_qubits, endianness) for k in range(0, 2**n_qubits)] + if result_type == ResultType.STRING: + return basis_strings + else: + basis_list = [list(map(int, tuple(basis))) for basis in basis_strings] + if result_type == ResultType.TORCH: + return torch.stack([torch.tensor(basis) for basis in basis_list]) + elif result_type == ResultType.NUMPY: + return np.stack([np.array(basis) for basis in basis_list]) + + +def samples_to_integers(samples: Counter, endianness: Endianness = Endianness.BIG) -> Counter: + """ + Converts a Counter of basis state samples to integer values + + Args: + samples (Counter({bits: counts})): basis state sample counter. + endianness (Endianness): endianness to use for conversion. + + Returns: + Counter({ints: counts}): samples converted + """ + + return Counter({basis_to_int(k, endianness): v for k, v in samples.items()}) + + +def format_number(x: float | complex, num_digits: int = 3) -> str: + if isinstance(x, int): + return f"{x}" + elif isinstance(x, float): + return f"{x:.{num_digits}f}" + elif isinstance(x, complex): + re = "" if np.isclose(x.real, 0) else f"{x.real:.{num_digits}f}" + im = "" if np.isclose(x.imag, 0) else f"{x.imag:.{num_digits}f}" + if len(re) > 0 and len(im) > 0: + return f"{re}+{im}j" + elif len(re) > 0 and len(im) == 0: + return re + elif len(re) == 0 and len(im) > 0: + return f"{im}j" + else: + return "0" + else: + raise ValueError(f"Unknown number type: {type(x)}") + + +def format_parameter(p: sympy.Basic) -> str: + def round_expr(expr: sympy.Basic, num_digits: int) -> sympy.Basic: + return expr.xreplace({n: round(n, num_digits) for n in expr.atoms(sympy.Number)}) + + return str(round_expr(p, 3)) + + +def print_sympy_expr(expr: sympy.Expr, num_digits: int = 3) -> str: + """ + Converts all numerical values in a sympy expression to + something with fewer digits for better readability. + """ + from qadence.parameters import sympy_to_numeric + + round_dict = {sympy_to_numeric(n): round(n, num_digits) for n in expr.atoms(sympy.Number)} + return str(expr.xreplace(round_dict)) + + +def isclose( + x: TNumber | Any, y: TNumber | Any, rel_tol: float = 1e-5, abs_tol: float = 1e-07 +) -> bool: + if isinstance(x, complex) or isinstance(y, complex): + return abs(x - y) <= max(rel_tol * max(abs(x), abs(y)), abs_tol) # type: ignore + + return math.isclose(x, y, rel_tol=rel_tol, abs_tol=abs_tol) + + +def eigenvalues( + x: torch.Tensor, max_num_evals: int | None = None, max_num_gaps: int | None = None +) -> torch.Tensor: + if max_num_evals and not max_num_gaps: + # get specified number of eigenvalues of generator + eigenvals, _ = eigs(x.squeeze(0).numpy(), k=max_num_evals, which="LM") + elif max_num_gaps and not max_num_evals: + # get eigenvalues of generator corresponding to specified number of spectral gaps + k = int(np.ceil(0.5 * (1 + np.sqrt(1 + 8 * max_num_gaps)))) + eigenvals, _ = eigs(x.squeeze(0).numpy(), k=k, which="LM") + else: + # get all eigenvalues of generator + eigenvals = eigvals(x) + return eigenvals + + +def _round_complex(t: torch.Tensor, decimals: int = 4) -> torch.Tensor: + def _round(_t: torch.Tensor) -> torch.Tensor: + r = _t.real.round(decimals=decimals) + i = _t.imag.round(decimals=decimals) + return torch.complex(r, i) + + fn = torch.vmap(_round) + return fn(t)