Skip to content

Commit

Permalink
mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
NicoRenaud committed Nov 1, 2023
1 parent 16b99c3 commit 4225ce0
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 41 deletions.
4 changes: 3 additions & 1 deletion vqls_prototype/hadamard_test/direct_hadamard_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def post_processing(self, sampler_result) -> npt.NDArray[np.cdouble]:

return np.array(val)

def get_value(self, sampler, parameter_sets: List, zne_strategy=None) -> List:
def get_value(
self, sampler, parameter_sets: List, zne_strategy=None
) -> npt.NDArray[np.cdouble]:
"""Compute the value of the test
Args:
Expand Down
39 changes: 31 additions & 8 deletions vqls_prototype/matrix_decomposition/matrix_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from itertools import chain, combinations
from collections import namedtuple
from itertools import product
from typing import Optional, Union, List, Tuple, TypeVar, cast
from typing import Optional, Union, List, Tuple, TypeVar, cast, Iterator


import numpy as np
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
"""

if load is not None:
self._coefficients, self._matrices, self._circuits = self.load(load)
self.load(load)

elif matrix is not None: # ignore circuits & coefficients
self.sparse_matrix = spsp.issparse(matrix)
Expand Down Expand Up @@ -158,11 +158,29 @@ def circuits(self) -> List[QuantumCircuit]:
"""circuits of the decomposition"""
return self._circuits

@circuits.setter
def circuits(self, circuits: List[QuantumCircuit]) -> None:
"""Set new circuits
Args:
circuits (List[QuantumCircuit]): new circuits
"""
self._circuits = circuits

@property
def coefficients(self) -> complex_array_type:
"""coefficients of the decomposition."""
return self._coefficients

@coefficients.setter
def coefficients(self, new_coefficients: np.ndarray) -> None:
"""Set the values of the coefficients
Args:
new_coefficients (np.ndarray): new values
"""
self._coefficients = new_coefficients

@property
def matrices(self) -> List[complex_array_type]:
"""return the unitary matrices"""
Expand Down Expand Up @@ -330,7 +348,7 @@ class PauliDecomposition(MatrixDecomposition):

def __init__(
self,
matrix: Optional[npt.NDArray] = None,
matrix: Optional[Union[npt.NDArray, spsp.csr_array]] = None,
circuits: Optional[Union[QuantumCircuit, List[QuantumCircuit]]] = None,
coefficients: Optional[
Union[float, complex, List[float], List[complex]]
Expand Down Expand Up @@ -380,6 +398,9 @@ def get_possible_pauli_strings(self) -> List:
Returns:
List: list of pauli strings
"""

assert isinstance(self._matrix, spsp.csr_array)

# if we use the sparse decomposition
if self.use_sparse:
# for now convert to coo and extract indices
Expand All @@ -400,7 +421,7 @@ def get_possible_pauli_strings(self) -> List:
return list(set(possible_pauli_strings))

# if we use the full decomposition
return product(self.basis, repeat=self.num_qubits)
return list(product(self.basis, repeat=self.num_qubits))

def decompose_matrix(
self,
Expand All @@ -414,7 +435,8 @@ def decompose_matrix(
"""

prefactor = 1.0 / (2**self.num_qubits)
unit_mats, coeffs, circuits = [], [], []
unit_mats: List[complex_array_type] = []
coeffs, circuits = [], []
self.strings = []
possible_pauli_strings = self.get_possible_pauli_strings()
for pauli_gates in tqdm(possible_pauli_strings):
Expand All @@ -428,7 +450,7 @@ def decompose_matrix(
pauli_op.to_matrix(sparse=True) @ self.matrix
).trace()
else:
coef: complex_array_type = np.einsum("ij,ji", pauli_op, self.matrix)
coef: complex_array_type = np.einsum("ij,ji", pauli_op, self.matrix) # type: ignore

if coef * np.conj(coef) != 0:
self.strings.append(pauli_string)
Expand All @@ -451,7 +473,8 @@ def load(self, filename) -> None:
Args:
filename (str): name of the file
"""
self.strings, self.coefficients = np.load(filename)
pauli_strings, coeffs = np.load(filename)
self.strings
self.circuits = []
for pauli_string in self.strings:
self.circuits.append(self._create_circuit(pauli_string))
Expand All @@ -474,7 +497,7 @@ def get_off_diagonal_element_pauli_strings(
x_matrix = np.array([[0, 1], [1, 0]])
# shift = 0

def powerset(iterable: List) -> List:
def powerset(iterable: List) -> Iterator:
"""Create a powerset (0,2) -> [(), (0), (2), (0,2)]
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def __init__(
super().__init__(matrix, circuits, coefficients, load, sparse)
self.contract_pauli_terms(build_circuits=True)

def contract_pauli_terms(
self, build_circuits=True
) -> Tuple[complex_array_type, List[complex_array_type], List[QuantumCircuit]]:
def contract_pauli_terms(self, build_circuits=True) -> None:
"""Compute the contractions of the Pauli Strings.
Returns:
Expand All @@ -95,7 +93,7 @@ def contract_pauli_terms(
for i2 in range(i1 + 1, nstrings):
# extract pauli strings
pauli_string_1, pauli_string_2 = self.strings[i1], self.strings[i2]
contracted_pauli_string, contracted_coefficient = "", 1.0
contracted_pauli_string, contracted_coefficient = "", 1.0 + 0.0j

# contract pauli gates qubit wise
for ip in str_len:
Expand Down
9 changes: 4 additions & 5 deletions vqls_prototype/solver/base_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
VariationalLinearSolverResult,
)

from ..matrix_decomposition.matrix_decomposition import MatrixDecomposition
from .log import VQLSLog


Expand Down Expand Up @@ -90,7 +91,7 @@ def __init__(
self._eval_count = 0

self.vector_circuit = QuantumCircuit(0)
self.matrix_circuits = None # QuantumCircuit(0)
self.matrix_circuits: MatrixDecomposition = None # QuantumCircuit(0)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand Down Expand Up @@ -158,14 +159,12 @@ def max_evals_grouped(self, max_evals_grouped: int):
self.optimizer.set_max_evals_grouped(max_evals_grouped)

@property
def callback(self) -> Optional[Callable[[int, np.ndarray, float, float], None]]:
def callback(self) -> Callable[[int, float, np.ndarray], None]:
"""Returns callback"""
return self._callback

@callback.setter
def callback(
self, callback: Optional[Callable[[int, np.ndarray, float, float], None]]
):
def callback(self, callback: Callable[[int, float, np.ndarray], None]):
"""Sets callback"""
self._callback = callback

Expand Down
22 changes: 11 additions & 11 deletions vqls_prototype/solver/hybrid_qst_vqls.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ def construct_circuit(
raise NotImplementedError("The rhs vector cannot be a quantum circuit")

if isinstance(vector, np.ndarray):
self.vector_norm = np.linalg.norm(vector)
self.vector_amplitude = vector / self.vector_norm
self.vector_norm = np.linalg.norm(vector) # type: ignore
self.vector_amplitude = vector / self.vector_norm # type: ignore

if (self.options["reuse_matrix"] is True) and (
self.matrix_circuits is not None
Expand All @@ -212,8 +212,8 @@ def construct_circuit(
self.matrix_circuits = OptimizedPauliDecomposition(matrix, vector)

# a single circuit
elif issubclass(matrix, OptimizedPauliDecomposition):
self.matrix_circuits = matrix
elif issubclass(matrix, OptimizedPauliDecomposition): # type: ignore
self.matrix_circuits = matrix # type: ignore

else:
raise ValueError(
Expand All @@ -232,7 +232,7 @@ def _get_norm_circuits(self) -> List[QuantumCircuit]:
circuits = []
for (
circ
) in self.matrix_circuits.optimized_measurement.shared_basis_transformation:
) in self.matrix_circuits.optimized_measurement.shared_basis_transformation: # type: ignore
circuits.append(
DirectHadamardTest(
operators=circ,
Expand Down Expand Up @@ -430,15 +430,15 @@ def _init_tomography(self, tomography: str, num_shadows=None):
tomography (str): the name of the tomography
"""
if tomography == "simulator":
self.tomography_calculator = SimulatorQST(self._ansatz)
self.tomography_calculator = SimulatorQST(self._ansatz) # type: ignore
elif tomography == "htree":
self.tomography_calculator = HTreeQST(self._ansatz, self.sampler)
self.tomography_calculator = HTreeQST(self._ansatz, self.sampler) # type: ignore
elif tomography == "full":
self.tomography_calculator = FullQST(
self.tomography_calculator = FullQST( # type: ignore
self._ansatz, Aer.get_backend("statevector_simulator")
)
elif tomography == "shadow":
self.tomography_calculator = ShadowQST(
self.tomography_calculator = ShadowQST( # type: ignore
self._ansatz, self.sampler, num_shadows
)
else:
Expand Down Expand Up @@ -527,10 +527,10 @@ def _solve(
samples = BatchDirectHadammardTest(norm_circuits).get_values(
self.sampler, solution.optimal_point
)
solution.vector = self.tomography_calculator.get_statevector(
solution.vector = self.tomography_calculator.get_statevector( # type: ignore
solution.optimal_point,
samples=self.reformat_samples_for_shadows(samples),
labels=self.matrix_circuits.optimized_measurement.shared_basis_string,
labels=self.matrix_circuits.optimized_measurement.shared_basis_string, # type: ignore
)

return solution
5 changes: 4 additions & 1 deletion vqls_prototype/solver/log.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from dataclasses import dataclass
from typing import List
import numpy as np


@dataclass
class VQLSLog:
values: List
parameters: List

def update(self, count, cost, parameters): # pylint: disable=unused-argument
def update(
self, count: int, cost: float, parameters: np.ndarray
) -> None: # pylint: disable=unused-argument
self.values.append(cost)
self.parameters.append(parameters)
15 changes: 8 additions & 7 deletions vqls_prototype/solver/qst_vqls.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
VariationalLinearSolverResult,
)

from ..matrix_decomposition.matrix_decomposition import MatrixDecomposition

from ..matrix_decomposition.optimized_matrix_decomposition import (
ContractedPauliDecomposition,
Expand Down Expand Up @@ -179,7 +180,7 @@ def __init__(

def preprocessing_matrices(
self,
matrix: Union[np.ndarray, QuantumCircuit, List],
matrix: Union[np.ndarray, QuantumCircuit, List, ContractedPauliDecomposition],
vector: Union[np.ndarray, QuantumCircuit],
) -> None:
"""Returns the a list of circuits required to compute the expectation value
Expand All @@ -201,8 +202,8 @@ def preprocessing_matrices(
raise NotImplementedError("We didn't do that yet")

if isinstance(vector, np.ndarray):
self.vector_norm = np.linalg.norm(vector)
self.vector_amplitude = vector / self.vector_norm
self.vector_norm = np.linalg.norm(vector) # type: ignore
self.vector_amplitude = vector / self.vector_norm # type: ignore

if (self.options["reuse_matrix"] is True) and (
self.matrix_circuits is not None
Expand All @@ -214,8 +215,8 @@ def preprocessing_matrices(
self.matrix_circuits = ContractedPauliDecomposition(matrix, vector)

# a single circuit
elif issubclass(matrix, ContractedPauliDecomposition):
self.matrix_circuits = matrix
elif issubclass(matrix, ContractedPauliDecomposition): # type: ignore
self.matrix_circuits = matrix # type: ignore

else:
raise ValueError(
Expand Down Expand Up @@ -338,7 +339,7 @@ def _assemble_cost_function(

return cost

def get_cost_evaluation_function( # pylint: disable=arguments-differ
def get_cost_evaluation_function( # type: ignore[override] # pylint: disable=arguments-differ
self,
coefficient_matrix: np.ndarray,
) -> Callable[[np.ndarray], Union[float, List[float]]]:
Expand Down Expand Up @@ -520,7 +521,7 @@ def _solve(
solution.state = self.ansatz.assign_parameters(solution.optimal_parameters)

# solution vector
solution.vector = self.tomography_calculator.get_statevector(
solution.vector = self.tomography_calculator.get_statevector( # type: ignore
solution.optimal_point
)

Expand Down
10 changes: 6 additions & 4 deletions vqls_prototype/solver/vqls.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ def _get_norm_circuits(self) -> List[QuantumCircuit]:

# use measurement optimized Pauli decomposition
if isinstance(self.matrix_circuits, OptimizedPauliDecomposition):
for circ in self.matrix_circuits.qwc_groups_shared_basis_transformation:
for (
circ
) in self.matrix_circuits.optimized_measurement.shared_basis_transformation:
hdmr_tests_norm.append(
DirectHadamardTest(
operators=circ,
Expand All @@ -306,11 +308,11 @@ def _get_norm_circuits(self) -> List[QuantumCircuit]:
)
)

# use contracted Pauli Decomposition
# use contracted Pauli Decomposition: SHOULD IT BE DIRECTHADAMARDTEST ?!
elif isinstance(self.matrix_circuits, ContractedPauliDecomposition):
for circ in self.matrix_circuits.contracted_circuits:
hdmr_tests_norm.append(
HadammardTest(
HadammardTest( # type: ignore[arg-type]
operators=[circ],
apply_initial_state=self._ansatz,
apply_measurement=False,
Expand All @@ -326,7 +328,7 @@ def _get_norm_circuits(self) -> List[QuantumCircuit]:
for jj_mat in range(ii_mat + 1, len(self.matrix_circuits)):
mat_j = self.matrix_circuits[jj_mat]
hdmr_tests_norm.append(
HadammardTest(
HadammardTest( # type: ignore[arg-type]
operators=[mat_i.circuit.inverse(), mat_j.circuit],
apply_initial_state=self._ansatz,
apply_measurement=False,
Expand Down

0 comments on commit 4225ce0

Please sign in to comment.