diff --git a/pulser-core/pulser/backend/__init__.py b/pulser-core/pulser/backend/__init__.py index f4f9361b3..5fb31eee4 100644 --- a/pulser-core/pulser/backend/__init__.py +++ b/pulser-core/pulser/backend/__init__.py @@ -14,8 +14,45 @@ """Classes for backend execution.""" import pulser.noise_model as noise_model # For backwards compat -from pulser.backend.config import EmulatorConfig +from pulser.backend.abc import Backend, EmulatorBackend +from pulser.backend.config import EmulatorConfig, EmulationConfig from pulser.noise_model import NoiseModel # For backwards compat from pulser.backend.qpu import QPUBackend +from pulser.backend.results import Results +from pulser.backend.state import State +from pulser.backend.operator import Operator +from pulser.backend.observable import Callback, Observable +from pulser.backend.default_observables import ( + BitStrings, + CorrelationMatrix, + Energy, + EnergyVariance, + Expectation, + Fidelity, + Occupation, + SecondMomentOfEnergy, + StateResult, +) -__all__ = ["EmulatorConfig", "NoiseModel", "QPUBackend"] +__all__ = [ + "Backend", + "Callback", + "EmulatorBackend", + "EmulatorConfig", + "EmulationConfig", + "NoiseModel", + "Observable", + "Operator", + "QPUBackend", + "Results", + "State", + "BitStrings", + "CorrelationMatrix", + "Energy", + "EnergyVariance", + "Expectation", + "Fidelity", + "Occupation", + "SecondMomentOfEnergy", + "StateResult", +] diff --git a/pulser-core/pulser/backend/abc.py b/pulser-core/pulser/backend/abc.py index 704e91d55..39f84e7dd 100644 --- a/pulser-core/pulser/backend/abc.py +++ b/pulser-core/pulser/backend/abc.py @@ -14,34 +14,38 @@ """Base class for the backend interface.""" from __future__ import annotations -import typing from abc import ABC, abstractmethod +from collections.abc import Sequence +from typing import ClassVar +import pulser +from pulser.backend.config import EmulationConfig +from pulser.backend.results import Results from pulser.devices import Device -from pulser.result import Result -from pulser.sequence import Sequence - -Results = typing.Sequence[Result] class Backend(ABC): """The backend abstract base class.""" - def __init__(self, sequence: Sequence, mimic_qpu: bool = False) -> None: + def __init__( + self, sequence: pulser.Sequence, mimic_qpu: bool = False + ) -> None: """Starts a new backend instance.""" self.validate_sequence(sequence, mimic_qpu=mimic_qpu) self._sequence = sequence self._mimic_qpu = bool(mimic_qpu) @abstractmethod - def run(self) -> Results | typing.Sequence[Results]: + def run(self) -> Results | Sequence[Results]: """Executes the sequence on the backend.""" pass @staticmethod - def validate_sequence(sequence: Sequence, mimic_qpu: bool = False) -> None: + def validate_sequence( + sequence: pulser.Sequence, mimic_qpu: bool = False + ) -> None: """Validates a sequence prior to submission.""" - if not isinstance(sequence, Sequence): + if not isinstance(sequence, pulser.Sequence): raise TypeError( "'sequence' should be a `Sequence` instance" f", not {type(sequence)}." @@ -70,3 +74,27 @@ def validate_sequence(sequence: Sequence, mimic_qpu: bool = False) -> None: "the register's layout must be one of the layouts available " f"in '{device.name}.calibrated_register_layouts'." ) + + +class EmulatorBackend(Backend): + """The emulator backend parent class.""" + + default_config: ClassVar[EmulationConfig] + + def __init__( + self, + sequence: pulser.Sequence, + *, + config: EmulationConfig | None = None, + mimic_qpu: bool = False, + ) -> None: + """Initializes the backend.""" + super().__init__(sequence, mimic_qpu=mimic_qpu) + config = config or self.default_config + if not isinstance(config, EmulationConfig): + raise TypeError( + "'config' must be an instance of 'EmulationConfig', " + f"not {type(config)}." + ) + # See the BackendConfig definition to see why this works + self._config = type(self.default_config)(**config._backend_options) diff --git a/pulser-core/pulser/backend/config.py b/pulser-core/pulser/backend/config.py index 2da0e6f99..7c3fd631c 100644 --- a/pulser-core/pulser/backend/config.py +++ b/pulser-core/pulser/backend/config.py @@ -14,28 +14,236 @@ """Defines the backend configuration classes.""" from __future__ import annotations +import copy +import warnings +from collections import Counter from dataclasses import dataclass, field -from typing import Any, Literal, Sequence, get_args +from typing import ( + Any, + Generic, + Literal, + Sequence, + SupportsFloat, + TypeVar, + cast, + get_args, +) import numpy as np +from numpy.typing import ArrayLike +import pulser.math as pm +from pulser.backend.observable import Observable +from pulser.backend.state import State from pulser.noise_model import NoiseModel EVAL_TIMES_LITERAL = Literal["Full", "Minimal", "Final"] +StateType = TypeVar("StateType", bound=State) + -@dataclass(frozen=True) class BackendConfig: - """The base backend configuration. + """The base backend configuration.""" - Attributes: - backend_options: A dictionary of backend specific options. + _backend_options: dict[str, Any] + + def __init__(self, **backend_options: Any) -> None: + """Initializes the backend config.""" + cls_name = self.__class__.__name__ + if invalid_kwargs := ( + set(backend_options) + - (self._expected_kwargs() | {"backend_options"}) + ): + warnings.warn( + f"{cls_name!r} received unexpected keyword arguments: " + f"{invalid_kwargs}; only the following keyword " + f"arguments are expected: {self._expected_kwargs()}.", + stacklevel=2, + ) + # Prevents potential issues with mutable arguments + self._backend_options = copy.deepcopy(backend_options) + if "backend_options" in backend_options: + with warnings.catch_warnings(): + warnings.filterwarnings("always") + warnings.warn( + f"The 'backend_options' argument of {cls_name!r} " + "has been deprecated. Please provide the options " + f"as keyword arguments directly to '{cls_name}()'.", + DeprecationWarning, + stacklevel=2, + ) + self._backend_options.update(backend_options["backend_options"]) + + def _expected_kwargs(self) -> set[str]: + return set() + + def __getattr__(self, name: str) -> Any: + if ( + # Needed to avoid recursion error + "_backend_options" in self.__dict__ + and name in self._backend_options + ): + return self._backend_options[name] + raise AttributeError(f"{name!r} has not been passed to {self!r}.") + + +class EmulationConfig(BackendConfig, Generic[StateType]): + """Configures an emulation on a backend. + + Args: + observables: A sequence of observables to compute at specific + evaluation times. The observables without specified evaluation + times will use this configuration's 'default_evaluation_times'. + default_evaluation_times: The default times at which observables + are computed. Can be a sequence of relative times between 0 + (the start of the sequence) and 1 (the end of the sequence). + Can also be specified as "Full", in which case every step in the + emulation will also be an evaluation time. + initial_state: The initial state from which emulation starts. If + specified, the state type needs to be compatible with the emulator + backend. If left undefined, defaults to starting with all qudits + in the ground state. + with_modulation: Whether to emulate the sequence with the programmed + input or the expected output. + interaction_matrix: An optional interaction matrix to replace the + interaction terms in the Hamiltonian. For an N-qudit system, + must be an NxN symmetric matrix where entry (i, j) dictates + the interaction coefficient between qudits i and j, ie it replaces + the C_n/r_{ij}^n term. + prefer_device_noise_model: If the sequence's device has a default noise + model, this option signals the backend to prefer it over the noise + model given with this configuration. + noise_model: An optional noise model to emulate the sequence with. + Ignored if the sequence's device has default noise model and + `prefer_device_noise_model=True`. """ - backend_options: dict[str, Any] = field(default_factory=dict) + observables: Sequence[Observable] + default_evaluation_times: np.ndarray | Literal["Full"] + initial_state: StateType | None + with_modulation: bool + interaction_matrix: pm.AbstractArray | None + prefer_device_noise_model: bool + noise_model: NoiseModel + + def __init__( + self, + *, + observables: Sequence[Observable] = (), + # Default evaluation times for observables that don't specify one + default_evaluation_times: Sequence[SupportsFloat] | Literal["Full"] = ( + 1.0, + ), + initial_state: StateType | None = None, # Default is ggg... + with_modulation: bool = False, + interaction_matrix: ArrayLike | None = None, + prefer_device_noise_model: bool = False, + noise_model: NoiseModel = NoiseModel(), + **backend_options: Any, + ) -> None: + """Initializes the EmulationConfig.""" + obs_tags = [] + for obs in observables: + if not isinstance(obs, Observable): + raise TypeError( + "All entries in 'observables' must be instances of " + f"Observable. Instead, got instance of type {type(obs)}." + ) + obs_tags.append(obs.tag) + repeated_tags = [k for k, v in Counter(obs_tags).items() if v > 1] + if repeated_tags: + raise ValueError( + "Some of the provided 'observables' share identical tags. Use " + "'tag_suffix' when instantiating multiple instances of the " + "same observable so they can be distinguished. " + f"Repeated tags found: {repeated_tags}" + ) + if default_evaluation_times != "Full": + eval_times_arr = np.array(default_evaluation_times, dtype=float) + if np.any((eval_times_arr < 0.0) | (eval_times_arr > 1.0)): + raise ValueError( + "All evaluation times must be between 0. and 1. " + f"Instead, got {default_evaluation_times}." + ) + default_evaluation_times = cast(Sequence[float], eval_times_arr) + + if initial_state is not None and not isinstance(initial_state, State): + raise TypeError( + "When defined, 'initial_state' must be an instance of State;" + f" got object of type {type(initial_state)} instead." + ) + + if interaction_matrix is not None: + interaction_matrix = pm.AbstractArray(interaction_matrix) + _shape = interaction_matrix.shape + if len(_shape) != 2 or _shape[0] != _shape[1]: + raise ValueError( + "'interaction_matrix' must be a square matrix. Instead, " + f"an array of shape {_shape} was given." + ) + if ( + initial_state is not None + and _shape[0] != initial_state.n_qudits + ): + raise ValueError( + f"The received interaction matrix of shape {_shape} is " + "incompatible with the received initial state of " + f"{initial_state.n_qudits} qudits." + ) -@dataclass(frozen=True) + if not isinstance(noise_model, NoiseModel): + raise TypeError( + "'noise_model' must be a NoiseModel instance," + f" not {type(noise_model)}." + ) + + super().__init__( + observables=tuple(observables), + default_evaluation_times=default_evaluation_times, + initial_state=initial_state, + with_modulation=bool(with_modulation), + interaction_matrix=interaction_matrix, + prefer_device_noise_model=bool(prefer_device_noise_model), + noise_model=noise_model, + **backend_options, + ) + + def _expected_kwargs(self) -> set[str]: + return super()._expected_kwargs() | { + "observables", + "default_evaluation_times", + "initial_state", + "with_modulation", + "interaction_matrix", + "prefer_device_noise_model", + "noise_model", + } + + def is_evaluation_time(self, t: float, tol: float = 1e-6) -> bool: + """Assesses whether a relative time is an evaluation time.""" + return ( + self.default_evaluation_times == "Full" and 0.0 <= t <= 1.0 + ) or ( + self.is_time_in_evaluation_times( + t, self.default_evaluation_times, tol=tol + ) + ) + + @staticmethod + def is_time_in_evaluation_times( + t: float, evaluation_times: ArrayLike, tol: float = 1e-6 + ) -> bool: + """Checks if a time is within a collection of evaluation times.""" + return 0.0 <= t <= 1.0 and bool( + np.any(np.abs(np.array(evaluation_times, dtype=float) - t) <= tol) + ) + + +# Legacy class + + +@dataclass class EmulatorConfig(BackendConfig): """The configuration for emulator backends. @@ -74,6 +282,7 @@ class EmulatorConfig(BackendConfig): `prefer_device_noise_model=True`. """ + backend_options: dict[str, Any] = field(default_factory=dict) sampling_rate: float = 1.0 evaluation_times: float | Sequence[float] | EVAL_TIMES_LITERAL = "Full" initial_state: Literal["all-ground"] | Sequence[complex] = "all-ground" @@ -82,6 +291,7 @@ class EmulatorConfig(BackendConfig): noise_model: NoiseModel = field(default_factory=NoiseModel) def __post_init__(self) -> None: + # TODO: Deprecate once QutipBackendV2 is feature complete if not (0 < self.sampling_rate <= 1.0): raise ValueError( "The sampling rate (`sampling_rate` = " diff --git a/pulser-core/pulser/backend/default_observables.py b/pulser-core/pulser/backend/default_observables.py new file mode 100644 index 000000000..ae7d26b54 --- /dev/null +++ b/pulser-core/pulser/backend/default_observables.py @@ -0,0 +1,412 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the default observables.""" +from __future__ import annotations + +import copy +import functools +from collections import Counter +from collections.abc import Sequence +from typing import Any, Type + +import pulser.math as pm +from pulser.backend.config import EmulationConfig +from pulser.backend.observable import Observable +from pulser.backend.operator import Operator, OperatorType +from pulser.backend.state import Eigenstate, State, StateType + + +class StateResult(Observable): + """Stores the quantum state at the evaluation times. + + Args: + evaluation_times: The relative times at which to store the state. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + @property + def _base_tag(self) -> str: + return "state" + + def apply(self, *, state: StateType, **kwargs: Any) -> StateType: + """Calculates the observable to store in the Results.""" + return copy.deepcopy(state) + + +class BitStrings(Observable): + """Stores bitstrings sampled from the state at the evaluation times. + + Error rates are taken from the EmulationConfig given to the backend. + The bitstrings are stored as a Counter[str]. + + Args: + evaluation_times: The relative times at which to sample bitstrings. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + num_shots: How many bitstrings to sample each time this observable + is computed. + one_state: The eigenstate that measures to 1. Can be left undefined + if the state's eigenstates form a known eigenbasis with a + defined "one state". + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + def __init__( + self, + *, + evaluation_times: Sequence[float] | None = None, + num_shots: int = 1000, + one_state: Eigenstate | None = None, + tag_suffix: str | None = None, + ): + """Initializes the observable.""" + super().__init__( + evaluation_times=evaluation_times, tag_suffix=tag_suffix + ) + if num_shots < 1: + raise ValueError( + "'num_shots' must be greater than or equal to 1, " + f"not {num_shots}." + ) + self.num_shots = int(num_shots) + self.one_state = one_state + + @property + def _base_tag(self) -> str: + return "bitstrings" + + def apply( + self, + *, + config: EmulationConfig, + state: State, + **kwargs: Any, + ) -> Counter[str]: + """Calculates the observable to store in the Results.""" + return state.sample( + num_shots=self.num_shots, + one_state=self.one_state, + p_false_pos=config.noise_model.p_false_pos, + p_false_neg=config.noise_model.p_false_neg, + ) + + +class Fidelity(Observable): + """Stores the fidelity with a pure state at the evaluation times. + + The fidelity uses the overlap between the given state and the state of + the system at each evaluation time. For pure states, this corresponds + to $|<ψ|φ(t)>|^2$ for the given state $|ψ>$ and the state $|φ(t)>$ + obtained by time evolution. + + Args: + state: The state |ψ>. Note that this must be of an appropriate type + for the backend. + evaluation_times: The relative times at which to compute the fidelity. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + def __init__( + self, + state: State, + *, + evaluation_times: Sequence[float] | None = None, + tag_suffix: str | None = None, + ): + """Initializes the observable.""" + super().__init__( + evaluation_times=evaluation_times, tag_suffix=tag_suffix + ) + if not isinstance(state, State): + raise TypeError( + f"'state' must be a State instance; got {type(state)} instead." + ) + self.state = state + + @property + def _base_tag(self) -> str: + return "fidelity" + + def apply(self, *, state: State, **kwargs: Any) -> Any: + """Calculates the observable to store in the Results.""" + return self.state.overlap(state) + + +class Expectation(Observable): + """Stores the expectation of the given operator on the current state. + + Args: + evaluation_times: The relative times at which to compute the + expectation value. If left as `None`, uses the + `default_evaluation_times` of the backend's `EmulationConfig`. + operator: The operator to measure. Must be of the appropriate type + for the backend. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + def __init__( + self, + operator: Operator, + *, + evaluation_times: Sequence[float] | None = None, + tag_suffix: str | None = None, + ): + """Initializes the observable.""" + super().__init__( + evaluation_times=evaluation_times, tag_suffix=tag_suffix + ) + if not isinstance(operator, Operator): + raise TypeError( + "'operator' must be an Operator instance;" + f" got {type(operator)} instead." + ) + self.operator = operator + + @property + def _base_tag(self) -> str: + return "expectation" + + def apply(self, *, state: State, **kwargs: Any) -> Any: + """Calculates the observable to store in the Results.""" + return self.operator.expect(state) + + +class CorrelationMatrix(Observable): + """Stores the correlation matrix for the current state. + + The correlation matrix is calculated as + [[<φ(t)|n_i n_j|φ(t)> for j in qubits] for i in qubits] + where n_k = |one_state> str: + return "correlation_matrix" + + @staticmethod + @functools.cache + def _get_number_operator( + qudit_ids: frozenset[int], + n_qudits: int, + eigenstates: Sequence[Eigenstate], + one_state: Eigenstate, + op_type: Type[OperatorType], + ) -> OperatorType: + n_op = {one_state * 2: 1.0} + return op_type.from_operator_repr( + eigenstates=eigenstates, + n_qudits=n_qudits, + operations=[(1.0, [(n_op, qudit_ids)])], + ) + + def apply( + self, *, state: State, hamiltonian: Operator, **kwargs: Any + ) -> list[list]: + """Calculates the observable to store in the Results.""" + + @functools.cache + def calc_expectation(qudit_ids: frozenset[int]) -> Any: + return self._get_number_operator( + qudit_ids, + state.n_qudits, + state.eigenstates, + self.one_state or state.infer_one_state(), + type(hamiltonian), + ).expect(state) + + return [ + [ + calc_expectation(frozenset((i, j))) + for j in range(state.n_qudits) + ] + for i in range(state.n_qudits) + ] + + +class Occupation(Observable): + """Stores the occupation number of an eigenstate on each qudit. + + For every qudit i, calculates <φ(t)|n_i|φ(t)>, where + n_i = |one_state> str: + return "occupation" + + def apply( + self, *, state: State, hamiltonian: Operator, **kwargs: Any + ) -> list: + """Calculates the observable to store in the Results.""" + return [ + CorrelationMatrix._get_number_operator( + frozenset((i,)), + state.n_qudits, + state.eigenstates, + self.one_state or state.infer_one_state(), + type(hamiltonian), + ).expect(state) + for i in range(state.n_qudits) + ] + + +class Energy(Observable): + """Stores the energy of the system at the evaluation times. + + Args: + evaluation_times: The relative times at which to compute the energy. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + @property + def _base_tag(self) -> str: + return "energy" + + def apply( + self, *, state: State, hamiltonian: Operator, **kwargs: Any + ) -> Any: + """Calculates the observable to store in the Results.""" + return hamiltonian.expect(state) + + +class EnergyVariance(Observable): + r"""Stores the varaiance of the Hamiltonian at the evaluation times. + + The variance of the Hamiltonian at time t is calculated by + $\\langle φ(t)|H(t)^2|φ(t)\\rangle - \\langle φ(t)|H(t)|φ(t)\\rangle^2$ + + + Args: + evaluation_times: The relative times at which to compute the variance. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + @property + def _base_tag(self) -> str: + return "energy_variance" + + def apply( + self, *, state: State, hamiltonian: Operator, **kwargs: Any + ) -> Any: + """Calculates the observable to store in the Results.""" + # This works for state vectors and density matrices and avoids + # squaring the hamiltonian + h_state = hamiltonian.apply_to(state) + result: pm.AbstractArray = pm.sqrt( + h_state.overlap(h_state).real + ) - state.overlap(h_state) + if not result.requires_grad: + return float(result) + # If the result requires_grad, return the AbstractArray + return result # pragma: no cover + + +class SecondMomentOfEnergy(Observable): + """Stores the expectation value of H(t)^2 at the evaluation times. + + Useful for computing the variance when averaging over many executions of + the program. + + Args: + evaluation_times: The relative times at which to compute the variance. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + @property + def _base_tag(self) -> str: + return "second_moment_of_energy" + + def apply( + self, *, state: State, hamiltonian: Operator, **kwargs: Any + ) -> Any: + """Calculates the observable to store in the Results.""" + # This works for state vectors and density matrices and avoids + # squaring the hamiltonian + h_state = hamiltonian.apply_to(state) + result = pm.sqrt(h_state.overlap(h_state).real) + if not result.requires_grad: + return float(result) + return result # pragma: no cover diff --git a/pulser-core/pulser/backend/observable.py b/pulser-core/pulser/backend/observable.py new file mode 100644 index 000000000..547a7e91b --- /dev/null +++ b/pulser-core/pulser/backend/observable.py @@ -0,0 +1,180 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the abstract base class for a callback and an observable.""" +from __future__ import annotations + +import uuid +from abc import ABC, abstractmethod +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any + +import numpy as np + +from pulser.backend.operator import Operator +from pulser.backend.state import State + +if TYPE_CHECKING: + from pulser.backend.config import EmulationConfig + from pulser.backend.results import Results + + +class Callback(ABC): + """A general Callback that is called during the emulation.""" + + def __init__(self) -> None: + """Initializes a Callback.""" + self._uuid: uuid.UUID = uuid.uuid4() + + @property + def uuid(self) -> uuid.UUID: + """A universal unique identifier for this instance.""" + return self._uuid + + @abstractmethod + def __call__( + self, + config: EmulationConfig, + t: float, + state: State, + hamiltonian: Operator, + result: Results, + ) -> None: + """Specifies a call to the callback at a specific time. + + This is called after each time step performed by the emulator. + By default it calls `apply()` to compute a result and put it in Results + if t in self.evaluation_times. + It can be overloaded to define general callbacks that don't put results + in the Results object. + + Args: + config: The config object passed to the backend. + t: The relative time as a float between 0 and 1. + state: The current state. + hamiltonian: The Hamiltonian at this time. + result: The Results object to store the result in. + """ + pass + + +class Observable(Callback): + """The Observable abstract base class. + + Args: + evaluation_times: The times at which to add a result to Results. + If left as `None`, uses the `default_evaluation_times` of the + backend's `EmulationConfig`. + tag_suffix: An optional suffix to append to the tag. Needed if + multiple instances of the same observable are given to the + same EmulationConfig. + """ + + def __init__( + self, + *, + evaluation_times: Sequence[float] | None = None, + tag_suffix: str | None = None, + ): + """Initializes the observable.""" + super().__init__() + if evaluation_times is not None: + eval_times_arr = np.array(evaluation_times, dtype=float) + if np.any((eval_times_arr < 0.0) | (eval_times_arr > 1.0)): + raise ValueError( + "All evaluation times must be between 0. and 1. " + f"Instead, got {evaluation_times}." + ) + self.evaluation_times = evaluation_times + self._tag_suffix = tag_suffix + + @property + @abstractmethod + def _base_tag(self) -> str: + pass + + @property + def tag(self) -> str: + """Label for the observable, used to index the Results object. + + Within a Results instance, all computed observables must have different + tags. + + Returns: + The tag of the observable. + """ + if self._tag_suffix is None: + return self._base_tag + return f"{self._base_tag}_{self._tag_suffix}" + + def __call__( + self, + config: EmulationConfig, + t: float, + state: State, + hamiltonian: Operator, + result: Results, + ) -> None: + """Specifies a call to the observable at a specific time. + + This is called after each time step performed by the emulator. + By default it calls `apply()` to compute a result and put it in Results + if t in self.evaluation_times. + It can be overloaded to define general callbacks that don't put results + in the Results object. + + Args: + config: The config object passed to the backend. + t: The relative time as a float between 0 and 1. + state: The current state. + hamiltonian: The Hamiltonian at this time. + result: The Results object to store the result in. + time_tol: Tolerance below which two time values are considered + equal. + """ + time_tol = ( + (0.5 / result.total_duration) if result.total_duration else 1e-6 + ) + if ( + self.evaluation_times is not None + and config.is_time_in_evaluation_times( + t, self.evaluation_times, tol=time_tol + ) + ) or config.is_evaluation_time(t, tol=time_tol): + value_to_store = self.apply( + config=config, state=state, hamiltonian=hamiltonian + ) + result._store(observable=self, time=t, value=value_to_store) + + @abstractmethod + def apply( + self, + *, + config: EmulationConfig, + state: State, + hamiltonian: Operator, + ) -> Any: + """Calculates the observable to store in the Results. + + Args: + config: The config object passed to the backend. + state: The current state. + hamiltonian: The Hamiltonian at this time. + + Returns: + The result to put in Results. + """ + pass + + def __repr__(self) -> str: + return f"{self.tag}:{self.uuid}" diff --git a/pulser-core/pulser/backend/operator.py b/pulser-core/pulser/backend/operator.py new file mode 100644 index 000000000..e0a7eb262 --- /dev/null +++ b/pulser-core/pulser/backend/operator.py @@ -0,0 +1,140 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the abstract base class for a quantum operator.""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Collection, Mapping, Sequence +from typing import Generic, Type, TypeVar + +from pulser.backend.state import Eigenstate, State + +ArgScalarType = TypeVar("ArgScalarType") +ReturnScalarType = TypeVar("ReturnScalarType") +StateType = TypeVar("StateType", bound=State) +OperatorType = TypeVar("OperatorType", bound="Operator") + +QuditOp = Mapping[str, ArgScalarType] # single qudit operator +TensorOp = Sequence[ + tuple[QuditOp, Collection[int]] +] # QuditOp applied to set of qudits +FullOp = Sequence[tuple[ArgScalarType, TensorOp]] # weighted sum of TensorOp + + +class Operator(ABC, Generic[ArgScalarType, ReturnScalarType, StateType]): + """Base class for a quantum operator.""" + + @abstractmethod + def apply_to(self, state: StateType, /) -> StateType: + """Apply the operator to a state. + + Args: + state: The state to apply this operator to. + + Returns: + The resulting state. + """ + pass + + @abstractmethod + def expect(self, state: StateType, /) -> ReturnScalarType: + """Compute the expectation value of self on the given state. + + Args: + state: The state with which to compute. + + Returns: + The expectation value. + """ + pass + + @abstractmethod + def __add__(self: OperatorType, other: OperatorType, /) -> OperatorType: + """Computes the sum of two operators. + + Args: + other: The other operator. + + Returns: + The summed operator. + """ + pass + + @abstractmethod + def __rmul__(self: OperatorType, scalar: ArgScalarType) -> OperatorType: + """Scale the operator by a scalar factor. + + Args: + scalar: The scalar factor. + + Returns: + The scaled operator. + """ + pass + + @abstractmethod + def __matmul__(self: OperatorType, other: OperatorType) -> OperatorType: + """Compose two operators where 'self' is applied after 'other'. + + Args: + other: The operator to compose with self. + + Returns: + The composed operator. + """ + pass + + @classmethod + @abstractmethod + def from_operator_repr( + cls: Type[OperatorType], + *, + eigenstates: Sequence[Eigenstate], + n_qudits: int, + operations: FullOp, + ) -> OperatorType: + """Create an operator from the operator representation. + + The full operator representation (FullOp is a weigthed sum of tensor + operators (TensorOp), written as a sequence of coefficient and tensor + operator pairs, ie + + `FullOp = Sequence[tuple[ScalarType, TensorOp]]` + + Each TensorOp is itself a sequence of qudit operators (QuditOp) applied + to mutually exclusive sets of qudits (represented by their indices), ie + + `TensorOp = Sequence[tuple[QuditOp, Collection[int]]]` + + Qudits without an associated QuditOp are applied the identity operator. + + Finally, each QuditOp is represented as weighted sum of pre-defined + single-qudit operators. It is given as a mapping between a string + representation of the single-qudit operator and its respective + coefficient, ie + + `QuditOp = Mapping[str, ScalarType]` + + By default it identifies strings 'ij' as single-qudit operators, where + i and j are eigenstates that denote |i> tuple[Result, ...]: + def results(self) -> tuple[Results, ...]: """The actual results, obtained after execution is done.""" - return self._results + return self._results_seq @property def batch_id(self) -> str: @@ -115,14 +115,14 @@ def get_batch_status(self) -> BatchStatus: """Gets the status of the batch linked to these results.""" return self._connection._get_batch_status(self._batch_id) - def get_available_results(self) -> dict[str, Result]: + def get_available_results(self) -> dict[str, Results]: """Returns the available results. Unlike the `results` property, this method does not raise an error if some of the jobs do not have results. Returns: - dict[str, Result]: A dictionary mapping the job ID to its results. + dict[str, Results]: A dictionary mapping the job ID to its results. Jobs with no result are omitted. """ results = { @@ -138,14 +138,14 @@ def get_available_results(self) -> dict[str, Result]: return results def __getattr__(self, name: str) -> Any: - if name == "_results": + if name == "_results_seq": try: - self._results = tuple( + self._results_seq = tuple( self._connection._fetch_result( self.batch_id, self._job_ids ) ) - return self._results + return self._results_seq except RemoteResultsError as e: raise RemoteResultsError( "Results are not available for all jobs. Use the " @@ -168,21 +168,21 @@ def submit( open: bool = True, batch_id: str | None = None, **kwargs: Any, - ) -> RemoteResults | tuple[RemoteResults, ...]: + ) -> RemoteResults: """Submit a job for execution.""" pass @abstractmethod def _fetch_result( self, batch_id: str, job_ids: list[str] | None - ) -> typing.Sequence[Result]: + ) -> typing.Sequence[Results]: """Fetches the results of a completed batch.""" pass @abstractmethod def _query_job_progress( self, batch_id: str - ) -> Mapping[str, tuple[JobStatus, Result | None]]: + ) -> Mapping[str, tuple[JobStatus, Results | None]]: """Fetches the status and results of all the jobs in a batch. Unlike `_fetch_result`, this method does not raise an error if some @@ -251,7 +251,7 @@ def __init__( def run( self, job_params: list[JobParams] | None = None, wait: bool = False - ) -> RemoteResults | tuple[RemoteResults, ...]: + ) -> RemoteResults: """Runs the sequence on the remote backend and returns the result. Args: @@ -308,13 +308,10 @@ def __init__(self, backend: RemoteBackend) -> None: self.backend = backend def __enter__(self) -> _OpenBatchContextManager: - batch = cast( - RemoteResults, - self.backend._connection.submit( - self.backend._sequence, - open=True, - **self.backend._submit_kwargs(), - ), + batch = self.backend._connection.submit( + self.backend._sequence, + open=True, + **self.backend._submit_kwargs(), ) self.backend._batch_id = batch.batch_id return self diff --git a/pulser-core/pulser/backend/results.py b/pulser-core/pulser/backend/results.py new file mode 100644 index 000000000..257866e2e --- /dev/null +++ b/pulser-core/pulser/backend/results.py @@ -0,0 +1,157 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the base class for storing backend results.""" +from __future__ import annotations + +import collections.abc +import typing +import uuid +from dataclasses import dataclass, field +from typing import Any, TypeVar, overload + +from pulser.backend.observable import Observable + + +@dataclass +class Results: + """A collection of results. + + Args: + atom_order: The order of the atoms/qudits in the results. + total_duration: The total duration of the sequence, in ns. + """ + + atom_order: tuple[str, ...] + total_duration: int + _results: dict[uuid.UUID, dict[float, Any]] = field(init=False) + _tagmap: dict[str, uuid.UUID] = field(init=False) + + def __post_init__(self) -> None: + self._results = {} + self._tagmap = {} + + def _store( + self, *, observable: Observable, time: float, value: Any + ) -> None: + """Store the result of an observable at a specific time. + + Args: + observable: The observable computing the result. + time: The relative time at which the observable was taken. + value: The value of the observable. + """ + self._results.setdefault(observable.uuid, {}) + + if time in self._results[observable.uuid]: + raise RuntimeError( + f"A value is already stored for observable '{observable.tag}'" + f" at time {time}." + ) + self._tagmap[observable.tag] = observable.uuid + self._results[observable.uuid][time] = value + + def __getattr__(self, name: str) -> Any: + if name in self._tagmap: + return dict(self._results[self._tagmap[name]]) + raise AttributeError(f"{name!r} is not in the results.") + + def get_result_tags(self) -> list[str]: + """Get a list of results tags present in this object.""" + return list(self._tagmap.keys()) + + def get_result_times(self, observable: Observable | str) -> list[float]: + """Get a list of times for which the given result has been stored. + + Args: + observable: The observable instance used to calculate the result + or its tag. + + Returns: + List of relative times. + """ + return list(self._results[self._find_uuid(observable)].keys()) + + def get_result(self, observable: Observable | str, time: float) -> Any: + """Get the a specific result at a given time. + + Args: + observable: The observable instance used to calculate the result + or its tag. + time: Relative time at which to get the result. + + Returns: + The result. + """ + try: + return self._results[self._find_uuid(observable)][time] + except KeyError: + raise ValueError( + f"{observable!r} is not available at time {time}." + ) + + def get_tagged_results(self) -> dict[str, dict[float, Any]]: + """Gets the results for every tag. + + Returns: + A mapping between a tag and the results associated to it, + at every evaluation time. + """ + return { + tag: dict(self._results[uuid_]) + for tag, uuid_ in self._tagmap.items() + } + + def _find_uuid(self, observable: Observable | str) -> uuid.UUID: + if isinstance(observable, Observable): + if observable.uuid not in self._results: + raise ValueError( + f"'{observable!r}' has not been stored in the results" + ) + return observable.uuid + try: + return self._tagmap[observable] + except KeyError: + raise ValueError( + f"{observable!r} is not an Observable instance " + "nor a known observable tag in the results." + ) + + +ResultsType = TypeVar("ResultsType", bound=Results) + + +class ResultsSequence(typing.Sequence[ResultsType]): + """An immutable sequence of results.""" + + _results_seq: tuple[ResultsType, ...] + + @overload + def __getitem__(self, key: int) -> ResultsType: + pass + + @overload + def __getitem__(self, key: slice) -> tuple[ResultsType, ...]: + pass + + def __getitem__( + self, key: int | slice + ) -> ResultsType | tuple[ResultsType, ...]: + return self._results_seq[key] + + def __len__(self) -> int: + return len(self._results_seq) + + def __iter__(self) -> collections.abc.Iterator[ResultsType]: + for res in self._results_seq: + yield res diff --git a/pulser-core/pulser/backend/state.py b/pulser-core/pulser/backend/state.py new file mode 100644 index 000000000..87f0f39bc --- /dev/null +++ b/pulser-core/pulser/backend/state.py @@ -0,0 +1,173 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the abstract base class for a quantum state.""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections import Counter +from collections.abc import Sequence +from typing import Generic, Literal, Type, TypeVar, Union + +import numpy as np + +from pulser.channels.base_channel import States + +Eigenstate = Union[States, Literal["0", "1"]] + +ArgScalarType = TypeVar("ArgScalarType") +ReturnScalarType = TypeVar("ReturnScalarType") +StateType = TypeVar("StateType", bound="State") + + +class State(ABC, Generic[ArgScalarType, ReturnScalarType]): + """Base class enforcing an API for quantum states. + + Each backend will implement its own type of state and the methods below. + """ + + _eigenstates: Sequence[Eigenstate] + + @property + @abstractmethod + def n_qudits(self) -> int: + """The number of qudits in the state.""" + pass + + @property + def eigenstates(self) -> tuple[Eigenstate, ...]: + """The eigenstates that form a qudit's eigenbasis. + + The order of the states should match the order in a numerical (ie + state vector or density matrix) representation. + For example, with eigenstates ("a", "b", ...), "a" will be associated + to eigenvector (1, 0, ...), "b" to (0, 1, ...) and so on. + """ + return tuple(self._eigenstates) + + @property + def qudit_dim(self) -> int: + """The dimensions (ie number of eigenstates) of a qudit.""" + return len(self.eigenstates) + + def get_basis_state_from_index(self, index: int) -> str: + """Generates a basis state combination from its index in the state. + + Assumes a state vector representation regardless of the actual support + of the state. + + Args: + index: The position of the state in a state vector. + + Returns: + The basis state combination for the desired index. + """ + if index < 0: + raise ValueError( + f"'index' must be a non-negative integer;" + f" got {index} instead." + ) + return "".join( + self.eigenstates[int(dig)] + for dig in np.base_repr(index, base=self.qudit_dim).zfill( + self.n_qudits + ) + ) + + @abstractmethod + def overlap(self: StateType, other: StateType, /) -> ReturnScalarType: + """Compute the overlap between this state and another of the same type. + + Generally computes Tr[AB] for mixed states A and B, which + corresponds to ||^2 for pure states A=|a> Counter[str]: + """Sample bitstrings from the state, taking into account error rates. + + Args: + num_shots: How many bitstrings to sample. + one_state: The eigenstate that measures to 1. Can be left undefined + if the state's eigenstates form a known eigenbasis with a + defined "one state". + p_false_pos: The rate at which a 0 is read as a 1. + p_false_neg: The rate at which a 1 is read as a 0. + + Returns: + The measured bitstrings, by count. + """ + pass + + @classmethod + @abstractmethod + def from_state_amplitudes( + cls: Type[StateType], + *, + eigenstates: Sequence[Eigenstate], + amplitudes: dict[str, ArgScalarType], + ) -> StateType: + """Construct the state from its basis states' amplitudes. + + Args: + eigenstates: The basis states (e.g., ('r', 'g')). + amplitudes: A mapping between basis state combinations and + complex amplitudes. + + Returns: + The state constructed from the amplitudes. + """ + pass + + def infer_one_state(self) -> Eigenstate: + """Infers the state measured as 1 from the eigenstates. + + Only works when the eigenstates form a known eigenbasis with + a well-defined "one state". + """ + eigenstates = set(self.eigenstates) + if eigenstates == {"0", "1"}: + return "1" + if eigenstates == {"r", "g"}: + return "r" + if eigenstates == {"g", "h"}: + return "h" + if eigenstates == {"u", "d"}: + return "d" + raise RuntimeError( + "Failed to infer the 'one state' from the " + f"eigenstates: {self.eigenstates}" + ) + + @staticmethod + def _validate_eigenstates(eigenstates: Sequence[Eigenstate]) -> None: + if any(not isinstance(s, str) or len(s) != 1 for s in eigenstates): + raise ValueError( + "All eigenstates must be represented by single characters." + ) + if len(eigenstates) != len(set(eigenstates)): + raise ValueError("'eigenstates' can't contain repeated entries.") diff --git a/pulser-core/pulser/register/register.py b/pulser-core/pulser/register/register.py index 0f0be2fd8..18f166fcc 100644 --- a/pulser-core/pulser/register/register.py +++ b/pulser-core/pulser/register/register.py @@ -355,10 +355,7 @@ def with_automatic_layout( raise TypeError( f"'device' must be of type Device, not {type(device)}." ) - if ( - self._coords_arr.is_tensor - and self._coords_arr.as_tensor().requires_grad - ): + if self._coords_arr.requires_grad: raise NotImplementedError( "'Register.with_automatic_layout()' does not support " "registers with differentiable coordinates." diff --git a/pulser-core/pulser/result.py b/pulser-core/pulser/result.py index 717fd7ef7..d8b434c6d 100644 --- a/pulser-core/pulser/result.py +++ b/pulser-core/pulser/result.py @@ -14,27 +14,43 @@ """Classes to store measurement results.""" from __future__ import annotations -import collections.abc -import typing +import uuid +import warnings from abc import ABC, abstractmethod from collections import Counter -from dataclasses import dataclass -from typing import Any, TypeVar, overload +from dataclasses import dataclass, field +from typing import Any import matplotlib.pyplot as plt import numpy as np -from pulser.register import QubitId +import pulser.backend.results as backend_results +from pulser.backend.default_observables import BitStrings -__all__ = ["Result", "SampledResult", "Results", "ResultType"] + +def __getattr__(name: str) -> Any: + name_map = {"Results": "ResultsSequence", "ResultType": "ResultsType"} + if name not in name_map: + raise AttributeError(f"Module {__name__!r} has no attribute {name!r}.") + warnings.warn( + f"The 'pulser.result.{name}' class has been renamed to " + f"'{name_map[name]}' and moved to 'pulser.backend.results'. " + "Importing it as '{name}' from 'pulser.results' is deprecated.", + DeprecationWarning, + stacklevel=3, + ) + return getattr(backend_results, name_map[name]) + + +__all__ = ["Result", "SampledResult"] @dataclass -class Result(ABC): - """Base class for storing the result of a sequence run.""" +class Result(ABC, backend_results.Results): + """Base class for storing the result of an observable at specific time.""" - atom_order: tuple[QubitId, ...] meas_basis: str + total_duration: int = field(default=0, init=False) @property def sampling_dist(self) -> dict[str, float]: @@ -136,12 +152,28 @@ class SampledResult(Result): meas_basis: The measurement basis. bitstring_counts: The number of times each bitstring was measured. + evaluation_time: Relative time at which the samples were + taken. """ bitstring_counts: dict[str, int] + evaluation_time: float = 1.0 def __post_init__(self) -> None: + super().__post_init__() self.n_samples = sum(self.bitstring_counts.values()) + # TODO: Make sure this is not too hacky + bitstrings_obs = BitStrings() + # Override UUID so that two SampledResult instances with + # the same counts are identical + bitstrings_obs._uuid = uuid.UUID( + "00000000-0000-0000-0000-000000000000" + ) + self._store( + observable=bitstrings_obs, + time=self.evaluation_time, + value=self.bitstring_counts, + ) @property def sampling_errors(self) -> dict[str, float]: @@ -159,32 +191,3 @@ def _weights(self) -> np.ndarray: for bitstr, counts in self.bitstring_counts.items(): weights[int(bitstr, base=2)] = counts / self.n_samples return weights / sum(weights) - - -ResultType = TypeVar("ResultType", bound=Result) - - -class Results(typing.Sequence[ResultType]): - """An immutable sequence of results.""" - - _results: tuple[ResultType, ...] - - @overload - def __getitem__(self, key: int) -> ResultType: - pass - - @overload - def __getitem__(self, key: slice) -> tuple[ResultType, ...]: - pass - - def __getitem__( - self, key: int | slice - ) -> ResultType | tuple[ResultType, ...]: - return self._results[key] - - def __len__(self) -> int: - return len(self._results) - - def __iter__(self) -> collections.abc.Iterator[ResultType]: - for res in self._results: - yield res diff --git a/pulser-core/pulser/waveforms.py b/pulser-core/pulser/waveforms.py index 6b029011e..b3752d40f 100644 --- a/pulser-core/pulser/waveforms.py +++ b/pulser-core/pulser/waveforms.py @@ -190,7 +190,7 @@ def modulated_samples( The array of samples after modulation. """ detach = True # We detach unless... - if self.samples.is_tensor and self.samples.as_tensor().requires_grad: + if self.samples.requires_grad: # ... the samples require grad. In this case, we clear the cache # so that the modulation is recalculated with the current samples self._modulated_samples.cache_clear() diff --git a/pulser-pasqal/pulser_pasqal/backends.py b/pulser-pasqal/pulser_pasqal/backends.py index 1051178ec..d04160adb 100644 --- a/pulser-pasqal/pulser_pasqal/backends.py +++ b/pulser-pasqal/pulser_pasqal/backends.py @@ -59,7 +59,7 @@ def __init__( def run( self, job_params: list[JobParams] | None = None, wait: bool = False - ) -> RemoteResults | tuple[RemoteResults, ...]: + ) -> RemoteResults: """Executes on the emulator backend through the Pasqal Cloud. Args: diff --git a/pulser-simulation/pulser_simulation/__init__.py b/pulser-simulation/pulser_simulation/__init__.py index cddf5e04f..78598b445 100644 --- a/pulser-simulation/pulser_simulation/__init__.py +++ b/pulser-simulation/pulser_simulation/__init__.py @@ -16,6 +16,8 @@ from pulser import EmulatorConfig, NoiseModel from pulser_simulation._version import __version__ as __version__ +from pulser_simulation.qutip_state import QutipState +from pulser_simulation.qutip_op import QutipOperator from pulser_simulation.qutip_backend import QutipBackend from pulser_simulation.simconfig import SimConfig from pulser_simulation.simulation import QutipEmulator @@ -23,6 +25,8 @@ __all__ = [ "EmulatorConfig", "NoiseModel", + "QutipState", + "QutipOperator", "QutipBackend", "QutipEmulator", "SimConfig", diff --git a/pulser-simulation/pulser_simulation/qutip_op.py b/pulser-simulation/pulser_simulation/qutip_op.py new file mode 100644 index 000000000..876ab5271 --- /dev/null +++ b/pulser-simulation/pulser_simulation/qutip_op.py @@ -0,0 +1,270 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Definition of QutipState and QutipOperator.""" +from __future__ import annotations + +from collections.abc import Collection, Mapping, Sequence +from typing import Any, SupportsComplex, Type, TypeVar, cast + +import qutip + +from pulser.backend.operator import Operator +from pulser.backend.state import Eigenstate +from pulser_simulation.qutip_state import QutipState + +QutipStateType = TypeVar("QutipStateType", bound=QutipState) +QutipOperatorType = TypeVar("QutipOperatorType", bound="QutipOperator") + +QuditOp = Mapping[str, SupportsComplex] +TensorOp = Sequence[tuple[QuditOp, Collection[int]]] +FullOp = Sequence[tuple[SupportsComplex, TensorOp]] + + +class QutipOperator(Operator[SupportsComplex, complex, QutipStateType]): + """A quantum operator stored as a qutip.Qobj. + + Args: + state: The operator as a qutip.Qobj. + eigenstates: The eigenstates that form a qudit's eigenbasis, each + given as an individual character. The order of the eigenstates + matters, as for eigenstates ("a", "b", ...), "a" will be + associated to eigenvector (1, 0, ...), "b" to (0, 1, ...) and + so on. + + """ + + def __init__( + self, operator: qutip.Qobj, eigenstates: Sequence[Eigenstate] + ): + """Initializes a QutipOperator.""" + QutipState._validate_eigenstates(eigenstates) + self._eigenstates = tuple(eigenstates) + if not isinstance(operator, qutip.Qobj) or not operator.isoper: + raise TypeError( + "'operator' must be a qutip.Qobj with type 'oper', not " + f"{operator!r}." + ) + QutipState._validate_shape(operator.shape, len(self._eigenstates)) + self._operator = operator + + @property + def eigenstates(self) -> tuple[Eigenstate, ...]: + """The eigenstates that form a qudit's eigenbasis. + + The order of the states should match the order in a numerical (ie + state vector or density matrix) representation. + For example, with eigenstates ("a", "b", ...), "a" will be associated + to eigenvector (1, 0, ...), "b" to (0, 1, ...) and so on. + """ + return tuple(self._eigenstates) + + def to_qobj(self) -> qutip.Qobj: + """Returns a copy of the operators's Qobj representation.""" + return self._operator.copy() + + def apply_to(self, state: QutipStateType, /) -> QutipStateType: + """Apply the operator to a state. + + Args: + state: The state to apply this operator to. + + Returns: + The resulting state. + """ + self._validate_other(state, QutipState, "QutipOperator.apply_to()") + out = self._operator * state._state + if state._state.isoper: + out = out * self._operator.dag() + return type(state)(out, eigenstates=state.eigenstates) + + def expect(self, state: QutipState, /) -> complex: + """Compute the expectation value of self on the given state. + + Args: + state: The state with which to compute. + + Returns: + The expectation value. + """ + self._validate_other(state, QutipState, "QutipOperator.expect()") + return cast(complex, qutip.expect(self._operator, state._state)) + + def __add__( + self: QutipOperatorType, other: QutipOperatorType, / + ) -> QutipOperatorType: + """Computes the sum of two operators. + + Args: + other: The other operator. + + Returns: + The summed operator. + """ + self._validate_other(other, QutipOperator, "__add__") + return type(self)( + self._operator + other._operator, eigenstates=self.eigenstates + ) + + def __rmul__( + self: QutipOperatorType, scalar: SupportsComplex + ) -> QutipOperatorType: + """Scale the operator by a scalar factor. + + Args: + scalar: The scalar factor. + + Returns: + The scaled operator. + """ + return type(self)( + complex(scalar) * self._operator, eigenstates=self.eigenstates + ) + + def __matmul__( + self: QutipOperatorType, other: QutipOperatorType + ) -> QutipOperatorType: + """Compose two operators where 'self' is applied after 'other'. + + Args: + other: The operator to compose with self. + + Returns: + The composed operator. + """ + self._validate_other(other, QutipOperator, "__matmul__") + return type(self)( + self._operator * other._operator, eigenstates=self.eigenstates + ) + + @classmethod + def from_operator_repr( + cls: Type[QutipOperatorType], + *, + eigenstates: Sequence[Eigenstate], + n_qudits: int, + operations: FullOp, + ) -> QutipOperatorType: + """Create an operator from the operator representation. + + The full operator representation (FullOp is a weigthed sum of tensor + operators (TensorOp), written as a sequence of coefficient and tensor + operator pairs, ie + + `FullOp = Sequence[tuple[ScalarType, TensorOp]]` + + Each TensorOp is itself a sequence of qudit operators (QuditOp) applied + to mutually exclusive sets of qudits (represented by their indices), ie + + `TensorOp = Sequence[tuple[QuditOp, Collection[int]]]` + + Qudits without an associated QuditOp are applied the identity operator. + + Finally, each QuditOp is represented as weighted sum of pre-defined + single-qudit operators. It is given as a mapping between a string + representation of the single-qudit operator and its respective + coefficient, ie + + `QuditOp = Mapping[str, ScalarType]` + + By default it identifies strings 'ij' as single-qudit operators, where + i and j are eigenstates that denote |i> qutip.Qobj: + op = qutip.identity(qudit_dim) * 0 + for proj_str, coeff in qudit_op.items(): + if len(proj_str) != 2 or any( + s_ not in eigenstates for s_ in proj_str + ): + raise ValueError( + "Every QuditOp key must be made up of two eigenstates;" + f" instead, got '{proj_str}'." + ) + ket = qutip.basis(qudit_dim, eigenstates.index(proj_str[0])) + bra = qutip.basis( + qudit_dim, eigenstates.index(proj_str[1]) + ).dag() + op += complex(coeff) * ket * bra + return op + + coeffs: list[complex] = [] + tensor_ops: list[qutip.Qobj] = [] + for tensor_op_num, (coeff, tensor_op) in enumerate(operations): + coeffs.append(complex(coeff)) + qudit_ops = [qutip.identity(qudit_dim) for _ in range(n_qudits)] + free_inds = set(range(n_qudits)) + for qudit_op, qudit_inds in tensor_op: + if bad_inds_ := (set(qudit_inds) - free_inds): + raise ValueError( + "Got invalid indices for a system with " + f"{n_qudits} qudits: {bad_inds_}. For TensorOp " + f"#{tensor_op_num}, only indices {free_inds} " + "were still available." + ) + for ind in qudit_inds: + qudit_ops[ind] = build_qudit_op(qudit_op) + free_inds.remove(ind) + tensor_ops.append(qutip.tensor(qudit_ops)) + + full_op: qutip.Qobj = sum(c * t for c, t in zip(coeffs, tensor_ops)) + return cls(full_op, eigenstates=eigenstates) + + def __repr__(self) -> str: + return "\n".join( + [ + "QutipOperator", + "-------------", + f"Eigenstates: {self.eigenstates}", + self._operator.__repr__(), + ] + ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, QutipOperator): + return False + return ( + self.eigenstates == other.eigenstates + and self._operator == other._operator + ) + + def _validate_other( + self, + other: QutipState | QutipOperator, + expected_type: Type, + op_name: str, + ) -> None: + if not isinstance(other, expected_type): + raise TypeError( + f"'{op_name}' expects a '{expected_type.__name__}' instance, " + f"not {type(other)}." + ) + if self.eigenstates != other.eigenstates: + msg = ( + f"Can't apply {op_name} between a {self.__class__.__name__} " + f"with eigenstates {self.eigenstates} and a " + f"{other.__class__.__name__} with {other.eigenstates}." + ) + if set(self.eigenstates) != set(other.eigenstates): + raise ValueError(msg) + raise NotImplementedError(msg) diff --git a/pulser-simulation/pulser_simulation/qutip_result.py b/pulser-simulation/pulser_simulation/qutip_result.py index 1ae584ae2..5dfeeac37 100644 --- a/pulser-simulation/pulser_simulation/qutip_result.py +++ b/pulser-simulation/pulser_simulation/qutip_result.py @@ -25,7 +25,6 @@ States, get_states_from_bases, ) -from pulser.register import QubitId from pulser.result import Result @@ -43,10 +42,12 @@ class QutipResult(Result): same as the state's basis. """ - atom_order: tuple[QubitId, ...] - meas_basis: str state: qutip.Qobj matching_meas_basis: bool + evaluation_time: float = 1.0 + + def __post_init__(self) -> None: + super().__post_init__() @property def sampling_errors(self) -> dict[str, float]: diff --git a/pulser-simulation/pulser_simulation/qutip_state.py b/pulser-simulation/pulser_simulation/qutip_state.py new file mode 100644 index 000000000..2be563581 --- /dev/null +++ b/pulser-simulation/pulser_simulation/qutip_state.py @@ -0,0 +1,286 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Definition of QutipState and QutipOperator.""" +from __future__ import annotations + +import math +from collections import Counter, defaultdict +from collections.abc import Collection, Mapping, Sequence +from typing import Any, SupportsComplex, Type, TypeVar + +import numpy as np +import qutip + +from pulser.backend.state import Eigenstate, State + +QutipStateType = TypeVar("QutipStateType", bound="QutipState") + +QuditOp = Mapping[str, SupportsComplex] +TensorOp = Sequence[tuple[QuditOp, Collection[int]]] +FullOp = Sequence[tuple[SupportsComplex, TensorOp]] + + +class QutipState(State[SupportsComplex, complex]): + """A quantum state stored as a qutip.Qobj. + + Args: + state: The state as a qutip.Qobj. Can be a statevector + or a density matrix. + eigenstates: The eigenstates that form a qudit's eigenbasis, each + given as an individual character. The order of the eigenstates + matters, as for eigenstates ("a", "b", ...), "a" will be + associated to eigenvector (1, 0, ...), "b" to (0, 1, ...) and + so on. + """ + + def __init__( + self, state: qutip.Qobj, *, eigenstates: Sequence[Eigenstate] + ): + """Initializes a QutipState.""" + self._validate_eigenstates(eigenstates) + self._eigenstates = tuple(eigenstates) + valid_types = ("ket", "bra", "oper") + if not isinstance(state, qutip.Qobj) or state.type not in valid_types: + raise TypeError( + "'state' must be a qutip.Qobj with one of types " + f"{valid_types}, not {state!r}." + ) + self._state = state.dag() if state.isbra else state + self._validate_shape(self._state.shape, self.qudit_dim) + + @property + def n_qudits(self) -> int: + """The number of qudits in the state.""" + return round(math.log(self._state.shape[0], self.qudit_dim)) + + def to_qobj(self) -> qutip.Qobj: + """Returns a copy of the state's Qobj representation.""" + return self._state.copy() + + def overlap(self, other: QutipState) -> float: + """Compute the overlap between this state and another of the same type. + + Generally computes Tr[AB] for mixed states A and B, which + corresponds to ||^2 for pure states A=|a> dict[str, float]: + """Extracts the probabilties of measuring each basis state combination. + + The probabilities are normalized to sum to 1. + + Args: + cutoff: The value below which a probability is considered to be + zero. + + Returns: + A mapping between basis state combinations and their respective + probabilities. + """ + if not self._state.isket: + probs = np.abs(self._state.diag()).real + else: + probs = (np.abs(self._state.full()) ** 2).flatten().real + non_zero = np.argwhere(probs > cutoff).flatten() + # Renormalize to make the non-zero probablilites sum to 1. + probs = probs[non_zero] + probs = probs / np.sum(probs) + return dict(zip(map(self.get_basis_state_from_index, non_zero), probs)) + + def bitstring_probabilities( + self, *, one_state: Eigenstate | None = None, cutoff: float = 1e-10 + ) -> Mapping[str, float]: + """Extracts the probabilties of measuring each bitstring. + + Args: + one_state: The eigenstate that measures to 1. Can be left undefined + if the state's eigenstates form a known eigenbasis with a + defined "one state". + cutoff: The value below which a probability is considered to be + zero. + + Returns: + A mapping between bitstrings and their respective probabilities. + """ + one_state = one_state or self.infer_one_state() + zero_states = set(self.eigenstates) - {one_state} + probs = self.probabilities(cutoff=cutoff) + bitstring_probs: dict[str, float] = defaultdict(float) + for state_str in probs: + bitstring = state_str.replace(one_state, "1") + for s_ in zero_states: + bitstring = bitstring.replace(s_, "0") + bitstring_probs[bitstring] += probs[state_str] + return dict(bitstring_probs) + + def sample( + self, + *, + num_shots: int, + one_state: Eigenstate | None = None, + p_false_pos: float = 0.0, + p_false_neg: float = 0.0, + ) -> Counter[str]: + """Sample bitstrings from the state, taking into account error rates. + + Args: + num_shots: How many bitstrings to sample. + one_state: The eigenstate that measures to 1. Can be left undefined + if the state's eigenstates form a known eigenbasis with a + defined "one state". + p_false_pos: The rate at which a 0 is read as a 1. + p_false_neg: The rate at which a 1 is read as a 0. + + Returns: + The measured bitstrings, by count. + """ + bitstring_probs = self.bitstring_probabilities( + one_state=one_state, cutoff=1 / (1000 * num_shots) + ) + bitstrings = np.array(list(bitstring_probs)) + probs = np.array(list(map(float, bitstring_probs.values()))) + dist = np.random.multinomial(num_shots, probs) + # Filter out bitstrings without counts + non_zero_counts = dist > 0 + bitstrings = bitstrings[non_zero_counts] + dist = dist[non_zero_counts] + if p_false_pos == 0.0 and p_false_neg == 0.0: + return Counter(dict(zip(bitstrings, dist))) + + # Convert bitstrings to a 2D array + bitstr_arr = np.array([list(bs) for bs in bitstrings], dtype=int) + # If 1 is measured, flip_prob=p_false_neg else flip_prob=p_false_pos + flip_probs = np.where(bitstr_arr == 1, p_false_neg, p_false_pos) + # Repeat flip_probs of a bitstring as many times as it was measured + flip_probs_repeated = np.repeat(flip_probs, dist, axis=0) + # Generate random matrix of same shape + random_matrix = np.random.uniform(size=flip_probs_repeated.shape) + # Compare random matrix with flip probabilities to get the flips + flips = random_matrix < flip_probs_repeated + # Apply the flips with an XOR between original array and flips + new_bitstrings = bitstr_arr.repeat(dist, axis=0) ^ flips + + # Count all the new_bitstrings + # Not converting to str right away because tuple indexing is faster + new_counts: Counter = Counter(map(tuple, new_bitstrings)) + return Counter( + {"".join(map(str, k)): v for k, v in new_counts.items()} + ) + + @classmethod + def from_state_amplitudes( + cls: Type[QutipStateType], + *, + eigenstates: Sequence[Eigenstate], + amplitudes: dict[str, SupportsComplex], + ) -> QutipStateType: + """Construct the state from its basis states' amplitudes. + + Args: + eigenstates: The basis states (e.g., ('r', 'g')). + amplitudes: A mapping between basis state combinations and + complex amplitudes. + + Returns: + The state constructed from the amplitudes. + """ + cls._validate_eigenstates(eigenstates) + basis_states = list(amplitudes) + n_qudits = len(basis_states[0]) + if not all( + len(bs) == n_qudits and set(bs) <= set(eigenstates) + for bs in basis_states + ): + raise ValueError( + "All basis states must be combinations of eigenstates with the" + f" same length. Expected combinations of {eigenstates}, each " + f"with {n_qudits} elements." + ) + + qudit_dim = len(eigenstates) + + def make_qobj(basis_state: str) -> qutip.Qobj: + return qutip.tensor( + [ + qutip.basis(qudit_dim, eigenstates.index(s)) + for s in basis_state + ] + ) + + # Start with an empty Qobj with the right dimension + state = make_qobj(eigenstates[0] * n_qudits) * 0 + for basis_state, amp in amplitudes.items(): + state += complex(amp) * make_qobj(basis_state) + + return cls(state, eigenstates=eigenstates) + + def __repr__(self) -> str: + return "\n".join( + [ + "QutipState", + "----------", + f"Eigenstates: {self.eigenstates}", + self._state.__repr__(), + ] + ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, QutipState): + return False + return ( + self.eigenstates == other.eigenstates + and self._state == other._state + ) + + @staticmethod + def _validate_shape(shape: tuple[int, int], qudit_dim: int) -> None: + expected_n_qudits = math.log(shape[0], qudit_dim) + if not np.isclose(expected_n_qudits, round(expected_n_qudits)): + raise ValueError( + f"A qutip.Qobj with shape {shape} is incompatible with " + f"a system of {qudit_dim}-level qudits." + ) diff --git a/pulser-simulation/pulser_simulation/simresults.py b/pulser-simulation/pulser_simulation/simresults.py index c4156fc61..e3e490e6c 100644 --- a/pulser-simulation/pulser_simulation/simresults.py +++ b/pulser-simulation/pulser_simulation/simresults.py @@ -20,7 +20,7 @@ from abc import ABC, abstractmethod from collections import Counter from functools import lru_cache -from typing import Mapping, Optional, Tuple, Union, cast +from typing import Mapping, Optional, Tuple, TypeVar, Union, cast import matplotlib.pyplot as plt import numpy as np @@ -28,11 +28,14 @@ from numpy.typing import ArrayLike from qutip.piqs.piqs import isdiagonal -from pulser.result import Results, ResultType, SampledResult +from pulser.backend.results import ResultsSequence +from pulser.result import SampledResult from pulser_simulation.qutip_result import QutipResult +ResultType = TypeVar("ResultType", SampledResult, QutipResult) -class SimulationResults(ABC, Results[ResultType]): + +class SimulationResults(ABC, ResultsSequence[ResultType]): """Results of a simulation run of a pulse sequence. Parent class for NoisyResults and CoherentResults. @@ -224,7 +227,7 @@ def _meas_projector(self, state_n: int) -> qutip.Qobj: return qutip.basis(2, state_n).proj() -class NoisyResults(SimulationResults): +class NoisyResults(SimulationResults[SampledResult]): """Results of a noisy simulation run of a pulse sequence. Contrary to a CoherentResults object, this object contains a list of @@ -275,7 +278,7 @@ def __init__( basis_name_ = "digital" if basis == "all" else basis super().__init__(size, basis_name_, sim_times) self.n_measures = n_measures - self._results = tuple(run_output) + self._results_seq = tuple(run_output) @property def states(self) -> list[qutip.Qobj]: @@ -355,7 +358,7 @@ def get_error_bars() -> Tuple[ArrayLike, ArrayLike]: super().plot(op, fmt, label) -class CoherentResults(SimulationResults): +class CoherentResults(SimulationResults[QutipResult]): """Results of a coherent simulation run of a pulse sequence. Contains methods for studying the states and extracting useful information @@ -403,7 +406,7 @@ def __init__( f"{self._basis_name}' must be '{expected_meas_basis}'." ) self._meas_basis = meas_basis - self._results = tuple(run_output) + self._results_seq = tuple(run_output) if meas_errors is not None: if set(meas_errors) != {"epsilon", "epsilon_prime"}: raise ValueError( diff --git a/pulser-simulation/pulser_simulation/simulation.py b/pulser-simulation/pulser_simulation/simulation.py index 653ef20d0..b56ea5388 100644 --- a/pulser-simulation/pulser_simulation/simulation.py +++ b/pulser-simulation/pulser_simulation/simulation.py @@ -598,8 +598,9 @@ def _run_solver() -> CoherentResults: self._meas_basis, state, self._meas_basis in self.basis_name, + evaluation_time=t / self._tot_duration * 1e3, ) - for state in result.states + for state, t in zip(result.states, self._eval_times_array) ] return CoherentResults( results, @@ -652,8 +653,7 @@ def _run_solver() -> CoherentResults: update_ham = True # Will return NoisyResults - time_indices = range(len(self._eval_times_array)) - total_count = np.array([Counter() for _ in time_indices]) + total_count = np.array([Counter() for _ in self._eval_times_array]) # We run the system multiple times for i in range(loop_runs): if not update_ham: @@ -685,9 +685,10 @@ def _run_solver() -> CoherentResults: SampledResult( tuple(self._hamiltonian._qdict), self._meas_basis, - total_count[t], + total_count[ind], + evaluation_time=t / self._tot_duration * 1e3, ) - for t in time_indices + for ind, t in enumerate(self._eval_times_array) ] return NoisyResults( results, diff --git a/tests/test_backend.py b/tests/test_backend.py index 358743a57..30c98a879 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -15,12 +15,30 @@ import re import typing +import uuid +from collections import Counter +import numpy as np import pytest import pulser -from pulser.backend.abc import Backend -from pulser.backend.config import EmulatorConfig +from pulser.backend.abc import Backend, EmulatorBackend +from pulser.backend.config import ( + BackendConfig, + EmulationConfig, + EmulatorConfig, +) +from pulser.backend.default_observables import ( + BitStrings, + CorrelationMatrix, + Energy, + EnergyVariance, + Expectation, + Fidelity, + Occupation, + SecondMomentOfEnergy, + StateResult, +) from pulser.backend.qpu import QPUBackend from pulser.backend.remote import ( BatchStatus, @@ -30,9 +48,11 @@ RemoteResultsError, _OpenBatchContextManager, ) +from pulser.backend.results import Results from pulser.devices import AnalogDevice, MockDevice from pulser.register import SquareLatticeLayout from pulser.result import Result, SampledResult +from pulser_simulation import QutipOperator, QutipState @pytest.fixture @@ -239,3 +259,401 @@ def test_qpu_backend(sequence): available_results = remote_results.get_available_results() assert available_results == {"abcd": connection.result} + + +def test_emulator_backend(sequence): + + class ConcreteEmulator(EmulatorBackend): + + default_config = EmulationConfig(with_modulation=True) + + def run(self): + pass + + with pytest.raises( + TypeError, match="must be an instance of 'EmulationConfig'" + ): + ConcreteEmulator(sequence, config=EmulatorConfig) + + emu = ConcreteEmulator( + sequence, config=EmulationConfig(default_evaluation_times="Full") + ) + assert emu._config.default_evaluation_times == "Full" + assert not emu._config.with_modulation + + # Uses the default config + assert ConcreteEmulator(sequence)._config.with_modulation + + +def test_backend_config(): + with pytest.warns( + UserWarning, + match="'BackendConfig' received unexpected keyword arguments", + ): + config1 = BackendConfig(prefer_device_noise_model=True) + assert config1.prefer_device_noise_model + + with pytest.raises(AttributeError, match="'dt' has not been passed"): + config1.dt + + with pytest.warns( + DeprecationWarning, + match="The 'backend_options' argument of 'BackendConfig' has been " + "deprecated", + ): + config2 = BackendConfig(backend_options={"dt": 10}) + assert config2.backend_options["dt"] == 10 + assert config2.dt == 10 + + +def test_emulation_config(): + with pytest.raises( + TypeError, + match="All entries in 'observables' must be instances of Observable", + ): + EmulationConfig(observables=["fidelity"]) + with pytest.raises( + ValueError, + match="Some of the provided 'observables' share identical tags", + ): + EmulationConfig( + observables=[BitStrings(), BitStrings(num_shots=200000)] + ) + with pytest.raises( + ValueError, match="All evaluation times must be between 0. and 1." + ): + EmulationConfig(default_evaluation_times=[-1e15, 0.0, 0.5, 1.0]) + with pytest.raises( + TypeError, match="'initial_state' must be an instance of State" + ): + EmulationConfig(initial_state=[[1], [0]]) + with pytest.raises( + ValueError, + match=re.escape( + "'interaction_matrix' must be a square matrix. Instead, an array" + " of shape (4, 3) was given" + ), + ): + EmulationConfig(interaction_matrix=np.arange(12).reshape((4, 3))) + with pytest.raises( + ValueError, + match=re.escape( + "interaction matrix of shape (2, 2) is incompatible with " + "the received initial state of 3 qudits" + ), + ): + EmulationConfig( + interaction_matrix=np.eye(2), + initial_state=QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"rrr": 1.0} + ), + ) + with pytest.raises(TypeError, match="must be a NoiseModel"): + EmulationConfig(noise_model={"p_false_pos": 0.1}) + + +def test_results(): + res = Results(atom_order=(), total_duration=0) + assert res.get_result_tags() == [] + assert res.get_tagged_results() == {} + with pytest.raises( + AttributeError, match="'bitstrings' is not in the results" + ): + assert res.bitstrings + with pytest.raises( + ValueError, + match="'bitstrings' is not an Observable instance nor a known " + "observable tag", + ): + assert res.get_result_times("bitstrings") + + obs = BitStrings(tag_suffix="test") + with pytest.raises( + ValueError, + match=f"'bitstrings_test:{obs.uuid}' has not been stored", + ): + assert res.get_result(obs, 1.0) + + obs( + config=EmulationConfig(), + t=1.0, + state=QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"rrr": 1.0} + ), + hamiltonian=QutipOperator.from_operator_repr( + eigenstates=("r", "g"), n_qudits=3, operations=[(1.0, [])] + ), + result=res, + ) + assert res.get_result_tags() == ["bitstrings_test"] + expected_val = {1.0: Counter({"111": obs.num_shots})} + assert res.get_tagged_results() == {"bitstrings_test": expected_val} + assert res.bitstrings_test == expected_val + assert ( + res.get_result_times("bitstrings_test") + == res.get_result_times(obs) + == [1.0] + ) + assert ( + res.get_result(obs, 1.0) + == res.get_result("bitstrings_test", 1.0) + == expected_val[1.0] + ) + with pytest.raises(ValueError, match="not available at time 0.912"): + res.get_result(obs, 0.912) + + +class TestObservables: + @pytest.fixture + def ghz_state(self): + return QutipState.from_state_amplitudes( + eigenstates=("r", "g"), + amplitudes={"rrr": np.sqrt(0.5), "ggg": np.sqrt(0.5)}, + ) + + @pytest.fixture + def ham(self): + return QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=3, + operations=[(1.0, [])], + ) + + @pytest.fixture + def config(self): + return EmulationConfig() + + @pytest.fixture + def results(self): + return Results(atom_order=("q0", "q1", "q2"), total_duration=1000) + + @pytest.mark.parametrize("tag_suffix", [None, "foo"]) + @pytest.mark.parametrize("eval_times", [None, (0.0, 0.5, 1.0)]) + def test_base_init(self, eval_times, tag_suffix): + # We use StateResult because Observable is an ABC + obs = StateResult(evaluation_times=eval_times, tag_suffix=tag_suffix) + assert isinstance(obs.uuid, uuid.UUID) + assert obs.evaluation_times == eval_times + expected_tag = "state_foo" if tag_suffix else "state" + assert obs.tag == expected_tag + assert repr(obs) == f"{expected_tag}:{obs.uuid}" + with pytest.raises( + ValueError, match="All evaluation times must be between 0. and 1." + ): + StateResult(evaluation_times=[1.000001]) + + @pytest.mark.parametrize("eval_times", [None, (0.0, 0.5, 1.0)]) + def test_call( + self, + config: EmulationConfig, + results: Results, + ghz_state, + ham, + eval_times, + ): + assert not results.get_result_tags() # ie it's empty + assert config.default_evaluation_times == (1.0,) + # We use StateResult because Observable is an ABC + obs = StateResult(evaluation_times=eval_times) + assert obs.apply(state=ghz_state) == ghz_state + true_eval_times = eval_times or config.default_evaluation_times + + t_ = 0.1 + assert not config.is_time_in_evaluation_times(t_, true_eval_times) + obs(config, t_, ghz_state, ham, results) + assert not results.get_result_tags() # ie it's still empty + + t_ = 1.0 + assert config.is_time_in_evaluation_times(t_, true_eval_times) + obs(config, t_, ghz_state, ham, results) + assert results.get_result_tags() == ["state"] + assert ( + results.get_result_times("state") + == results.get_result_times(obs) + == [t_] + ) + assert results.get_result(obs, t_) == ghz_state + with pytest.raises( + RuntimeError, + match="A value is already stored for observable 'state' at time " + f"{t_}", + ): + obs(config, t_, ghz_state, ham, results) + + expected_tol = 0.5 / results.total_duration + t_minus_tol = t_ - expected_tol + assert config.is_time_in_evaluation_times( + t_minus_tol, true_eval_times, tol=expected_tol + ) + obs(config, t_minus_tol, ghz_state, ham, results) + assert results.get_result_times(obs) == [t_, t_minus_tol] + assert results.get_result(obs, t_minus_tol) == ghz_state + + t_plus_tol = t_ + expected_tol + assert t_plus_tol > 1.0 # ie it's not an evaluation time + assert not config.is_time_in_evaluation_times( + t_plus_tol, true_eval_times, tol=expected_tol + ) + obs(config, t_plus_tol, ghz_state, ham, results) + assert t_plus_tol not in results.get_result_times(obs) + + def test_state_result(self, ghz_state): + obs = StateResult() + assert obs.apply(state=ghz_state) == ghz_state + + @pytest.mark.parametrize("p_false_pos", [None, 0.4]) + @pytest.mark.parametrize("p_false_neg", [None, 0.3]) + @pytest.mark.parametrize("one_state", [None, "g"]) + @pytest.mark.parametrize("num_shots", [None, 100]) + def test_bitstrings( + self, + config: EmulationConfig, + ghz_state: QutipState, + num_shots, + one_state, + p_false_pos, + p_false_neg, + ): + with pytest.raises(ValueError, match="greater than or equal to 1"): + BitStrings(num_shots=0) + kwargs = {} + if num_shots: + kwargs["num_shots"] = num_shots + obs = BitStrings(one_state=one_state, **kwargs) + assert obs.tag == "bitstrings" + noise_model = pulser.NoiseModel( + p_false_pos=p_false_pos, p_false_neg=p_false_neg + ) + config.noise_model = noise_model + assert config.noise_model.noise_types == ( + ("SPAM",) if p_false_pos or p_false_neg else () + ) + np.random.seed(123) + expected_shots = num_shots or obs.num_shots + expected_counts = ghz_state.sample( + num_shots=expected_shots, + one_state=one_state or ghz_state.infer_one_state(), + p_false_pos=p_false_pos or 0, + p_false_neg=p_false_neg or 0, + ) + np.random.seed(123) + counts = obs.apply(config=config, state=ghz_state) + assert isinstance(counts, Counter) + assert sum(counts.values()) == expected_shots + if noise_model == pulser.NoiseModel(): + assert set(counts) == {"000", "111"} + assert counts == expected_counts + + @pytest.mark.parametrize("one_state", [None, "r", "g"]) + def test_correlation_matrix_and_occupation( + self, ghz_state, ham, one_state + ): + corr = CorrelationMatrix(one_state=one_state) + assert corr.tag == "correlation_matrix" + occ = Occupation(one_state=one_state) + assert occ.tag == "occupation" + expected_corr_matrix = np.full((3, 3), 0.5) + np.testing.assert_allclose( + corr.apply(state=ghz_state, hamiltonian=ham), expected_corr_matrix + ) + np.testing.assert_allclose( + occ.apply(state=ghz_state, hamiltonian=ham), + expected_corr_matrix.diagonal(), + ) + + ggg_state = QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"ggg": 1.0} + ) + expected_corr_matrix = np.ones((3, 3)) * int(one_state == "g") + np.testing.assert_allclose( + corr.apply(state=ggg_state, hamiltonian=ham), expected_corr_matrix + ) + np.testing.assert_allclose( + occ.apply(state=ggg_state, hamiltonian=ham), + expected_corr_matrix.diagonal(), + ) + + ggr_state = QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"ggr": 1.0} + ) + if one_state == "g": + expected_corr_matrix = np.array([[1, 1, 0], [1, 1, 0], [0, 0, 0]]) + else: + expected_corr_matrix = np.zeros((3, 3)) + expected_corr_matrix[2, 2] = 1 + np.testing.assert_allclose( + corr.apply(state=ggr_state, hamiltonian=ham), expected_corr_matrix + ) + np.testing.assert_allclose( + occ.apply(state=ggr_state, hamiltonian=ham), + expected_corr_matrix.diagonal(), + ) + + @pytest.fixture + def zzz(self): + return QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=3, + operations=[(1.0, [({"rr": 1.0, "gg": -1.0}, {0, 1, 2})])], + ) + + def test_energy_observables(self, ghz_state, ham, zzz): + energy = Energy() + assert energy.tag == "energy" + var = EnergyVariance() + assert var.tag == "energy_variance" + energy2 = SecondMomentOfEnergy() + assert energy2.tag == "second_moment_of_energy" + assert np.isclose(energy.apply(state=ghz_state, hamiltonian=ham), 1.0) + assert np.isclose(energy2.apply(state=ghz_state, hamiltonian=ham), 1.0) + assert np.isclose(var.apply(state=ghz_state, hamiltonian=ham), 0.0) + + assert np.isclose(energy.apply(state=ghz_state, hamiltonian=zzz), 0.0) + assert np.isclose(energy2.apply(state=ghz_state, hamiltonian=zzz), 1.0) + assert np.isclose(var.apply(state=ghz_state, hamiltonian=zzz), 1.0) + + custom_op = QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=3, + operations=[(1.0, [({"gg": -1}, {0, 1, 2})])], + ) + assert np.isclose( + energy.apply(state=ghz_state, hamiltonian=custom_op), -0.5 + ) + assert np.isclose( + energy2.apply(state=ghz_state, hamiltonian=custom_op), 0.5 + ) + assert np.isclose( + var.apply(state=ghz_state, hamiltonian=custom_op), 0.25 + ) + + def test_expectation(self, ghz_state, ham, zzz): + with pytest.raises( + TypeError, match="'operator' must be an Operator instance" + ): + Expectation(ham.to_qobj()) + h_exp = Expectation(ham) + assert h_exp.tag == "expectation" + assert h_exp.apply(state=ghz_state) == ham.expect(ghz_state) + z_exp = Expectation(zzz, tag_suffix="zzz") + assert z_exp.tag == "expectation_zzz" + assert z_exp.apply(state=ghz_state) == zzz.expect(ghz_state) + + def test_fidelity(self, ghz_state): + with pytest.raises( + TypeError, match="'state' must be a State instance" + ): + Fidelity(ghz_state.to_qobj()) + + fid_ggg = Fidelity( + QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"ggg": 1.0} + ), + tag_suffix="ggg", + ) + assert fid_ggg.tag == "fidelity_ggg" + assert np.isclose(fid_ggg.apply(state=ghz_state), 0.5) + + fid_ghz = Fidelity(ghz_state) + assert fid_ghz.tag == "fidelity" + assert np.isclose(fid_ghz.apply(state=ghz_state), 1.0) diff --git a/tests/test_pasqal.py b/tests/test_pasqal.py index 4a8e11382..19184ceb3 100644 --- a/tests/test_pasqal.py +++ b/tests/test_pasqal.py @@ -196,7 +196,7 @@ def test_remote_results(fixt, mock_batch, with_job_id): for job in select_jobs ) - assert hasattr(remote_results, "_results") + assert hasattr(remote_results, "_results_seq") fixt.mock_cloud_sdk.get_batch.reset_mock() available_results = remote_results.get_available_results() @@ -472,7 +472,7 @@ def test_submit(fixt, parametrized, emulator, mimic_qpu, seq, mock_batch): ) for _job in mock_batch.ordered_jobs ) - assert hasattr(remote_results, "_results") + assert hasattr(remote_results, "_results_seq") @pytest.mark.parametrize("emu_cls", [EmuTNBackend, EmuFreeBackend]) diff --git a/tests/test_qutip_state_op.py b/tests/test_qutip_state_op.py new file mode 100644 index 000000000..b26164080 --- /dev/null +++ b/tests/test_qutip_state_op.py @@ -0,0 +1,536 @@ +# Copyright 2024 Pulser Development Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import re + +import numpy as np +import pytest +import qutip + +from pulser_simulation import QutipOperator, QutipState + + +@pytest.fixture +def ket_r(): + return QutipState(qutip.basis(2, 0), eigenstates=("r", "g")) + + +@pytest.fixture +def dm_g(): + return QutipState( + qutip.basis(2, 1) * qutip.basis(2, 1).dag(), eigenstates=("r", "g") + ) + + +@pytest.fixture +def ket_plus(): + return QutipState.from_state_amplitudes( + eigenstates=("r", "g"), + amplitudes={"r": 1 / np.sqrt(2), "g": 1 / np.sqrt(2)}, + ) + + +class TestQutipState: + + def test_init(self): + with pytest.raises( + ValueError, + match="eigenstates must be represented by single characters", + ): + QutipState(qutip.basis(2, 0), eigenstates=["ground", "rydberg"]) + with pytest.raises(ValueError, match="can't contain repeated entries"): + QutipState(qutip.basis(2, 0), eigenstates=["r", "g", "r"]) + with pytest.raises(TypeError, match="must be a qutip.Qobj"): + QutipState(np.arange(16), eigenstates=["r", "g"]) + with pytest.raises( + TypeError, match="must be a qutip.Qobj with one of types" + ): + QutipState( + qutip.operator_to_vector(qutip.ket2dm(qutip.basis(2, 0))), + eigenstates=["r", "g"], + ) + + with pytest.raises( + ValueError, match="incompatible with a system of 3-level qudits" + ): + QutipState(qutip.basis(2, 0), eigenstates=["r", "g", "h"]) + state = QutipState( + qutip.basis(3, 0).dag(), eigenstates=["r", "g", "h"] + ) + assert state.n_qudits == 1 + assert state.qudit_dim == 3 + assert state.eigenstates == ("r", "g", "h") + assert state.to_qobj() == qutip.basis(3, 0) + with pytest.raises( + RuntimeError, match="Failed to infer the 'one state'" + ): + state.infer_one_state() + + three_qubit_qobj = qutip.tensor([qutip.basis(2, 1)] * 3) + state = QutipState(three_qubit_qobj, eigenstates=("r", "g")) + assert state.n_qudits == 3 + assert state.qudit_dim == 2 + assert state.eigenstates == ("r", "g") + assert state.to_qobj() == three_qubit_qobj + assert state.infer_one_state() == "r" + + two_qutrit_dm = qutip.ket2dm(qutip.tensor([qutip.basis(3, 0)] * 2)) + state = QutipState(two_qutrit_dm, eigenstates=["r", "g", "h"]) + assert state.n_qudits == 2 + assert state.qudit_dim == 3 + assert state.to_qobj() == two_qutrit_dm + + @pytest.mark.parametrize( + "eigenstates", [("g", "r"), ("g", "h"), ("u", "d"), ("0", "1")] + ) + def test_infer_one_state(self, eigenstates): + assert ( + QutipState( + qutip.basis(2, 0), eigenstates=eigenstates + ).infer_one_state() + == eigenstates[1] + ) + + def test_get_basis_state(self): + n_qudits = 3 + state = QutipState.from_state_amplitudes( + eigenstates=("r", "g", "h"), amplitudes={"g" * n_qudits: 1.0} + ) + assert state.get_basis_state_from_index(0) == "rrr" + assert state.get_basis_state_from_index(1) == "rrg" + assert state.get_basis_state_from_index(2) == "rrh" + assert state.get_basis_state_from_index(3) == "rgr" + assert state.get_basis_state_from_index(4) == "rgg" + assert state.get_basis_state_from_index(9) == "grr" + assert state.get_basis_state_from_index(3**n_qudits - 1) == "hhh" + + with pytest.raises( + ValueError, match="'index' must be a non-negative integer" + ): + state.get_basis_state_from_index(-1) + + def test_overlap( + self, ket_r: QutipState, dm_g: QutipState, ket_plus: QutipState + ): + assert ket_r.overlap(ket_r) == 1.0 + assert dm_g.overlap(ket_r) == ket_r.overlap(dm_g) == 0.0 + assert ket_plus.overlap(ket_r) == ket_r.overlap(ket_plus) + assert np.isclose(ket_plus.overlap(ket_r), 0.5) + assert dm_g.overlap(ket_plus) == ket_plus.overlap(dm_g) + assert np.isclose(dm_g.overlap(ket_plus), 0.5) + + with pytest.raises(TypeError, match="expects another 'QutipState'"): + dm_g.overlap(ket_r.to_qobj()) + + with pytest.raises( + ValueError, + match="Can't calculate the overlap between a state with 1 " + "2-dimensional qudits and another with 2 3-dimensional qudits", + ): + ket_r.overlap( + QutipState.from_state_amplitudes( + eigenstates=("r", "g", "h"), amplitudes={"rr": 1.0} + ) + ) + + err_msg = ( + "Can't calculate the overlap between states with eigenstates " + "('r', 'g') and {}." + ) + with pytest.raises( + ValueError, match=re.escape(err_msg.format(("u", "d"))) + ): + ket_r.overlap( + QutipState(qutip.basis(2, 0), eigenstates=("u", "d")) + ) + with pytest.raises( + NotImplementedError, match=re.escape(err_msg.format(("g", "r"))) + ): + ket_r.overlap( + QutipState(qutip.basis(2, 0), eigenstates=("g", "r")) + ) + + def test_probabilities(self, ket_plus: QutipState): + amps = { + "rr": np.sqrt(0.5), + "gg": 1j * np.sqrt(0.5 - 1e-12), + "gr": 1e-6, + } + state = QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes=amps + ) + probs = { + state_str: np.abs(amp) ** 2 for state_str, amp in amps.items() + } + state_probs = state.probabilities(cutoff=9e-13) + assert all(np.isclose(probs[k], state_probs[k]) for k in probs) + probs.pop("gr") + sum_ = sum(probs.values()) + probs = {k: v / sum_ for k, v in probs.items()} + state_probs = state.probabilities() + assert all(np.isclose(probs[k], state_probs[k]) for k in probs) + assert state.infer_one_state() == "r" + assert state.bitstring_probabilities() == { + "11": probs["rr"], + "00": probs["gg"], + } + assert state.bitstring_probabilities(one_state="g") == { + "11": probs["gg"], + "00": probs["rr"], + } + + dm_plus = QutipState( + qutip.ket2dm(ket_plus.to_qobj()), eigenstates=ket_plus.eigenstates + ) + assert dm_plus.probabilities() == {"r": 0.5, "g": 0.5} + assert dm_plus.bitstring_probabilities() == {"0": 0.5, "1": 0.5} + + def test_sample(self, ket_r: QutipState, dm_g: QutipState): + shots = 2000 + assert ket_r.sample(num_shots=shots) == {"1": shots} + assert ket_r.sample(num_shots=shots, one_state="g") == {"0": shots} + assert ket_r.sample(num_shots=shots, p_false_pos=0.1) == {"1": shots} + assert ket_r.sample(num_shots=shots, p_false_neg=0.1)["0"] > 0 + + assert dm_g.sample(num_shots=shots) == {"0": shots} + assert dm_g.sample(num_shots=shots, one_state="g") == {"1": shots} + assert dm_g.sample(num_shots=shots, p_false_neg=0.1) == {"0": shots} + assert dm_g.sample(num_shots=shots, p_false_pos=0.1)["1"] > 0 + + @pytest.mark.parametrize( + "amplitudes", + [ + {"rrh": 1.0}, + {"rr": 0.5, "rgg": np.sqrt(0.75)}, + ], + ) + def test_from_state_amplitudes_error(self, amplitudes): + with pytest.raises( + ValueError, + match=re.escape( + "All basis states must be combinations of eigenstates with " + f"the same length. Expected combinations of ('r', 'g'), each " + f"with {len(list(amplitudes)[0])} elements." + ), + ): + QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes=amplitudes + ) + + def test_from_state_amplitudes(self): + assert QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"g": 1.0} + ).to_qobj() == qutip.basis(2, 1) + assert QutipState.from_state_amplitudes( + eigenstates=("g", "r"), amplitudes={"g": 1.0} + ).to_qobj() == qutip.basis(2, 0) + assert QutipState.from_state_amplitudes( + eigenstates=("r", "g", "h"), amplitudes={"g": 1.0} + ).to_qobj() == qutip.basis(3, 1) + + r = qutip.basis(2, 0) + g = qutip.basis(2, 1) + assert QutipState.from_state_amplitudes( + eigenstates=("r", "g"), + amplitudes={"rr": -0.5j, "gr": 0.5, "rg": 0.5j, "gg": -0.5}, + ).to_qobj() == -0.5j * qutip.tensor([r, r]) + 0.5 * qutip.tensor( + [g, r] + ) + 0.5j * qutip.tensor( + [r, g] + ) - 0.5 * qutip.tensor( + [g, g] + ) + + def test_repr(self, ket_r): + assert repr(ket_r) == ( + "QutipState\n" + + "-" * len("QutipState") + + f"\nEigenstates: {ket_r.eigenstates}\n" + + repr(ket_r.to_qobj()) + ) + + def test_eq(self, ket_r, dm_g): + assert ket_r == QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"r": 1.0} + ) + assert dm_g != QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"g": 1.0} + ) + assert dm_g != qutip.basis(2, 1).proj() + + +class TestQutipOperator: + + def test_init(self): + with pytest.raises( + ValueError, + match="eigenstates must be represented by single characters", + ): + QutipOperator(qutip.sigmaz(), eigenstates=["ground", "rydberg"]) + with pytest.raises(ValueError, match="can't contain repeated entries"): + QutipOperator(qutip.sigmaz(), eigenstates=["r", "g", "r"]) + with pytest.raises(TypeError, match="must be a qutip.Qobj"): + QutipOperator(qutip.sigmaz().full(), eigenstates=["r", "g"]) + with pytest.raises( + TypeError, match="must be a qutip.Qobj with type 'oper'" + ): + QutipOperator(qutip.basis(2, 0), eigenstates=["r", "g"]) + with pytest.raises( + ValueError, match="incompatible with a system of 3-level qudits" + ): + QutipOperator(qutip.sigmaz(), eigenstates=["r", "g", "h"]) + + pauli_z = QutipOperator(qutip.sigmaz(), eigenstates=("r", "g")) + assert pauli_z.eigenstates == ("r", "g") + assert ( + pauli_z.to_qobj() + == qutip.basis(2, 0).proj() - qutip.basis(2, 1).proj() + ) + + @pytest.fixture + def pauli_i(self): + return QutipOperator(qutip.identity(2), eigenstates=("r", "g")) + + @pytest.fixture + def pauli_x(self): + return QutipOperator(qutip.sigmax(), eigenstates=("r", "g")) + + @pytest.fixture + def pauli_y(self): + return QutipOperator(qutip.sigmay(), eigenstates=("r", "g")) + + @pytest.fixture + def pauli_z(self): + return QutipOperator(qutip.sigmaz(), eigenstates=("r", "g")) + + @pytest.mark.parametrize("op_name", ["apply_to", "expect"]) + def test_errors_on_qutip_state(self, pauli_x, op_name): + op = getattr(pauli_x, op_name) + with pytest.raises( + TypeError, + match=re.escape( + f"'QutipOperator.{op_name}()' expects a 'QutipState' instance" + ), + ): + op(qutip.basis(2, 0)) + err_msg = ( + f"Can't apply QutipOperator.{op_name}() between a QutipOperator " + "with eigenstates ('r', 'g') and a QutipState with {}" + ) + with pytest.raises( + ValueError, match=re.escape(err_msg.format(("g", "h"))) + ): + op(QutipState(qutip.basis(2, 0), eigenstates=("g", "h"))) + with pytest.raises( + NotImplementedError, match=re.escape(err_msg.format(("g", "r"))) + ): + op(QutipState(qutip.basis(2, 0), eigenstates=("g", "r"))) + + @pytest.mark.parametrize("op_name", ["__add__", "__matmul__"]) + def test_errors_on_qutip_operator(self, pauli_x, op_name): + op = getattr(pauli_x, op_name) + with pytest.raises( + TypeError, + match=re.escape(f"'{op_name}' expects a 'QutipOperator' instance"), + ): + op(ket_r) + err_msg = ( + f"Can't apply {op_name} between a QutipOperator with eigenstates " + "('r', 'g') and a QutipOperator with {}" + ) + with pytest.raises( + ValueError, match=re.escape(err_msg.format(("g", "h"))) + ): + op(QutipOperator(qutip.basis(2, 0).proj(), eigenstates=("g", "h"))) + + with pytest.raises( + NotImplementedError, match=re.escape(err_msg.format(("g", "r"))) + ): + op(QutipOperator(qutip.basis(2, 0).proj(), eigenstates=("g", "r"))) + + def test_apply_to(self, ket_r, dm_g, pauli_x: QutipOperator): + assert pauli_x.apply_to(ket_r) == QutipState.from_state_amplitudes( + eigenstates=("r", "g"), amplitudes={"g": 1.0} + ) + assert pauli_x.apply_to(dm_g) == QutipState( + qutip.basis(2, 0).proj(), eigenstates=dm_g.eigenstates + ) + + def test_expect( + self, + pauli_x: QutipOperator, + pauli_y: QutipOperator, + pauli_z: QutipOperator, + ket_r, + dm_g, + ket_plus, + ): + assert pauli_x.expect(ket_r) == 0.0 + assert pauli_x.expect(dm_g) == 0.0 + assert np.isclose(pauli_x.expect(ket_plus), 1.0) + ket_minus = pauli_y.apply_to(ket_plus) + assert np.isclose(pauli_x.expect(ket_minus), -1.0) + + assert pauli_z.expect(ket_r) == 1.0 + assert pauli_z.expect(dm_g) == -1.0 + assert np.isclose(pauli_z.expect(ket_plus), 0.0) + + def test_add(self, pauli_x, pauli_y, pauli_z): + r = qutip.basis(2, 0) + g = qutip.basis(2, 1) + assert pauli_x + pauli_y == QutipOperator( + (1 - 1j) * r * g.dag() + (1 + 1j) * g * r.dag(), + eigenstates=pauli_x.eigenstates, + ) + + assert QutipOperator( + qutip.identity(2), eigenstates=pauli_z.eigenstates + ) + pauli_z == QutipOperator( + 2 * r.proj(), eigenstates=pauli_z.eigenstates + ) + + def test_rmul(self, pauli_i, pauli_z): + assert (1 - 2j) * pauli_i == QutipOperator( + (1 - 2j) * qutip.identity(2), eigenstates=pauli_z.eigenstates + ) + assert 0.5 * (pauli_i + pauli_z) == QutipOperator( + qutip.basis(2, 0).proj(), eigenstates=pauli_z.eigenstates + ) + + def test_matmul(self, pauli_i, pauli_x, pauli_y, pauli_z): + assert ( + pauli_x @ pauli_x + == pauli_y @ pauli_y + == pauli_z @ pauli_z + == pauli_i + ) + assert pauli_x @ pauli_z == -1j * pauli_y + assert pauli_z @ pauli_x == 1j * pauli_y + + def test_from_operator_repr(self, pauli_i): + with pytest.raises( + ValueError, + match="QuditOp key must be made up of two eigenstates; instead, " + "got 'gggg'", + ): + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[(1.0, [({"gggg": 1.0, "rr": -1.0}, {0})])], + ) + + with pytest.raises( + ValueError, + match="QuditOp key must be made up of two eigenstates; instead, " + "got 'hh'", + ): + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[(1.0, [({"hh": 1.0, "rr": -1.0}, {0})])], + ) + + with pytest.raises( + ValueError, + match="QuditOp key must be made up of two eigenstates; instead, " + "got 'hh'", + ): + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[(1.0, [({"hh": 1.0, "rr": -1.0}, {0})])], + ) + + with pytest.raises( + ValueError, match="Got invalid indices for a system with 2 qudits" + ): + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[(1.0, [({"gg": 1.0, "rr": -1.0}, {3, 5, 9})])], + ) + + with pytest.raises( + ValueError, + match=re.escape("only indices {1} were still available"), + ): + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[ + ( + 1.0, + [ + ({"gg": 1.0, "rr": -1.0}, {0}), + ({"rg": 1.0, "rg": 1.0}, {0}), + ], + ) + ], + ) + + assert QutipOperator.from_operator_repr( + eigenstates=("r", "g", "h"), + n_qudits=3, + operations=[ + (1.0, [({"rr": 1.0, "hh": -1.0}, {0}), ({"gr": -1j}, {2})]) + ], + ) == QutipOperator( + qutip.tensor( + [ + qutip.basis(3, 0).proj() - qutip.basis(3, 2).proj(), + qutip.identity(3), + -1j * qutip.basis(3, 1) * qutip.basis(3, 0).dag(), + ] + ), + eigenstates=("r", "g", "h"), + ) + + assert ( + QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=1, + operations=[(1, [])], + ) + == pauli_i + ) + + assert QutipOperator.from_operator_repr( + eigenstates=("r", "g"), + n_qudits=2, + operations=[(0.5, [({"rr": 1.0, "gg": -1.0}, {0})]), (0.5, [])], + ) == QutipOperator( + qutip.tensor( + [ + qutip.basis(2, 0).proj(), + qutip.identity(2), + ] + ), + eigenstates=("r", "g"), + ) + + def test_repr(self, pauli_z): + assert repr(pauli_z) == ( + "QutipOperator\n" + + "-" * len("QutipOperator") + + f"\nEigenstates: {pauli_z.eigenstates}\n" + + repr(pauli_z.to_qobj()) + ) + + def test_eq(self, pauli_i, pauli_z, dm_g): + g_proj = 0.5 * (pauli_i + (-1) * pauli_z) + assert g_proj == QutipOperator( + qutip.basis(2, 1).proj(), eigenstates=pauli_i.eigenstates + ) + assert g_proj != dm_g diff --git a/tests/test_result.py b/tests/test_result.py index feaee1c20..ee74552ef 100644 --- a/tests/test_result.py +++ b/tests/test_result.py @@ -20,6 +20,8 @@ import pytest import qutip +import pulser.result +from pulser.backend.results import ResultsSequence, ResultsType from pulser.result import SampledResult from pulser_simulation.qutip_result import QutipResult @@ -234,3 +236,16 @@ def test_qutip_result_density_matrices(): "10": 0.25, "11": 0.25, } + + +@pytest.mark.parametrize( + "old_name, obj", + [("Results", ResultsSequence), ("ResultType", ResultsType)], +) +def test_legacy_imports(old_name, obj): + with pytest.warns( + DeprecationWarning, + match=f"'pulser.result.{old_name}' class has been renamed " + f"to '{obj.__name__}'", + ): + assert getattr(pulser.result, old_name) == obj