diff --git a/blackwater/data/__init__.py b/blackwater/data/__init__.py index 023c20b..69cca8c 100644 --- a/blackwater/data/__init__.py +++ b/blackwater/data/__init__.py @@ -5,52 +5,45 @@ .. currentmodule:: blackwater.data -Classes -======= +Circuit encoders +================ .. autosummary:: :toctree: ../stubs/ - ExpValDataSet - CircuitGraphExpValMitigationDataset - DefaultNumpyEstimatorInputEncoder - NodeEncoder - DefaultNodeEncoder - BackendNodeEncoder - DefaultPyGEstimatorEncoder - PygData - ExpValData - -Functions + DefaultCircuitEncoder + DefaultPyGCircuitEncoder + +Operator encoders +================= + +.. autosummary:: + :toctree: ../stubs/ + + DefaultOperatorEncoder + +Backend encoders +================ + +.. autosummary:: + :toctree: ../stubs/ + + DefaultPyGBackendEncoder + +Utilities ========= .. autosummary:: :toctree: ../stubs/ - extract_properties_from_backend - circuit_to_json_graph - backend_to_json_graph - encode_pauli_sum_operator - encode_operator - encode_sparse_pauli_operatpr + DefaultNumpyEstimatorInputEncoder """ -from .loaders.dataclasses import ExpValDataSet -from .loaders.exp_val import CircuitGraphExpValMitigationDataset -from .encoders.numpy import DefaultNumpyEstimatorInputEncoder -from .encoders.torch import ( - NodeEncoder, +from .encoders.primtives_utils import DefaultNumpyEstimatorInputEncoder +from .encoders.backend import DefaultPyGBackendEncoder +from .encoders.operator import DefaultOperatorEncoder +from .encoders.circuit import ( DefaultNodeEncoder, - BackendNodeEncoder, - DefaultPyGEstimatorEncoder, - extract_properties_from_backend, - circuit_to_json_graph, - backend_to_json_graph, - PygData, - ExpValData, -) -from .encoders.utils import ( - encode_pauli_sum_operator, - encode_operator, - encode_sparse_pauli_operatpr, + DefaultCircuitEncoder, + DefaultPyGCircuitEncoder, ) diff --git a/blackwater/data/core.py b/blackwater/data/core.py index d08c42f..4fa84cf 100644 --- a/blackwater/data/core.py +++ b/blackwater/data/core.py @@ -1,10 +1,19 @@ """Dataclasses module.""" +from abc import abstractmethod from dataclasses import dataclass +from typing import List, Any +from qiskit import QuantumCircuit +from qiskit.dagcircuit import DAGNode +from qiskit.providers import Backend +from qiskit.quantum_info import Operator + +# pylint: disable=arguments-differ class DataEncoder: """Base data encode class.""" + @abstractmethod def encode(self, **kwargs): """Encodes data @@ -17,6 +26,54 @@ def encode(self, **kwargs): raise NotImplementedError +class DataDecoder: + """Base data decoder class.""" + + @classmethod + def decode(cls, data: Any): + """Decodes from data to object. + + Args: + data: encoded data + + Returns: + decoded object + """ + raise NotImplementedError + + +class CircuitEncoder(DataEncoder): + """Base encoder class for circuit objects.""" + + @abstractmethod + def encode(self, circuit: QuantumCircuit, **kwargs): # type: ignore + raise NotImplementedError + + +class OperatorEncoder(DataEncoder): + """Base encoder class for operator objects.""" + + @abstractmethod + def encode(self, operator: Operator, **kwargs): # type: ignore + raise NotImplementedError + + +class BackendEncoder(DataEncoder): + """Base encoder class for backend objects.""" + + @abstractmethod + def encode(self, backend: Backend, **kwargs): # type: ignore + raise NotImplementedError + + +class NodeEncoder(DataEncoder): + """Base class for circuit dag node encoder.""" + + def encode(self, node: DAGNode, **kwargs) -> List[float]: # type: ignore + """Encodes node of circuit dag.""" + raise NotImplementedError + + # pylint: disable=no-member @dataclass class BlackwaterData: diff --git a/blackwater/data/dataio/dataio.py b/blackwater/data/dataio/dataio.py index f985142..afdcc1d 100644 --- a/blackwater/data/dataio/dataio.py +++ b/blackwater/data/dataio/dataio.py @@ -4,7 +4,7 @@ from typing import List from blackwater.data.core import BlackwaterData -from blackwater.data.encoders.torch import ExpValData +from blackwater.data.encoders.graph_utils import ExpValData # pylint: disable=unspecified-encoding diff --git a/blackwater/data/encoders/backend.py b/blackwater/data/encoders/backend.py new file mode 100644 index 0000000..e56ad2e --- /dev/null +++ b/blackwater/data/encoders/backend.py @@ -0,0 +1,35 @@ +"""Backend encoders.""" + +from typing import Union + +import torch +from qiskit.circuit.library import get_standard_gate_name_mapping +from qiskit.providers import BackendV1, BackendV2 +from torch_geometric.data import Data + +from blackwater.data.core import BackendEncoder +from blackwater.data.encoders.graph_utils import backend_to_json_graph + +N_QUBIT_PROPERTIES = 2 +ALL_INSTRUCTIONS = list(get_standard_gate_name_mapping().keys()) + + +# pylint: disable=no-member +class DefaultPyGBackendEncoder(BackendEncoder): + """Default pytorch geometric backend encoder. + + Turns backend into pyg data. + """ + + def encode(self, backend: Union[BackendV1, BackendV2], **kwargs): # type: ignore + backend_graph = backend_to_json_graph(backend) + backend_nodes = torch.tensor(backend_graph.nodes, dtype=torch.float) + backend_edges = torch.transpose( + torch.tensor(backend_graph.edges, dtype=torch.float), 0, 1 + ) + backend_edge_features = torch.tensor( + backend_graph.edge_features, dtype=torch.float + ) + return Data( + x=backend_nodes, edge_index=backend_edges, edge_attr=backend_edge_features + ) diff --git a/blackwater/data/encoders/circuit.py b/blackwater/data/encoders/circuit.py new file mode 100644 index 0000000..fc06f65 --- /dev/null +++ b/blackwater/data/encoders/circuit.py @@ -0,0 +1,95 @@ +"""Circuit encoders.""" + +from typing import Optional + +import numpy as np +import torch +from qiskit import QuantumCircuit +from qiskit.providers import BackendV2 +from torch_geometric.data import Data + +from blackwater.data.core import CircuitEncoder +from blackwater.data.encoders.graph_utils import ( + DefaultNodeEncoder, + circuit_to_json_graph, + BackendNodeEncoder, +) + + +# pylint: disable=no-member +class DefaultCircuitEncoder(CircuitEncoder): + """Default circuit encoder to transform circuit into numpy array for training + + Returns: + numpy array where: + - first element - depth of circuit + - second element - 2q depth of circuit + - 3rd - number of 1q gates + - 4th - number of 2q gates + - 5th - num qubits + """ + + def encode(self, circuit: QuantumCircuit, **kwargs) -> np.ndarray: # type: ignore + """Encodes circuit. + + Args: + circuit: circuit to encoder + **kwargs: other arguments + + Returns: + numpy array + """ + depth = circuit.depth() + two_qubit_depth = circuit.depth(lambda x: x[0].num_qubits == 2) + + num_one_q_gates = 0 + num_two_q_gates = 0 + for instr in circuit._data: + num_qubits = len(instr.qubits) + if num_qubits == 1: + num_one_q_gates += 1 + if num_qubits == 2: + num_two_q_gates += 1 + + return np.array( + [ + depth, + two_qubit_depth, + num_one_q_gates, + num_two_q_gates, + circuit.num_qubits, + ] + ) + + +class DefaultPyGCircuitEncoder(CircuitEncoder): + """Default pytorch geometric circuit encoder. + + Turns circuit into pyg data. + """ + + def __init__(self, backend: Optional[BackendV2]): + """Constructor. + + Args: + backend: optional backend. Will be used for node data encoding. + """ + self.backend = backend + + def encode(self, circuit: QuantumCircuit, **kwargs): # type: ignore + node_encoder = ( + DefaultNodeEncoder() + if self.backend is None + else BackendNodeEncoder(self.backend) + ) + circuit_graph = circuit_to_json_graph(circuit, node_encoder=node_encoder) + circuit_nodes = torch.tensor(circuit_graph.nodes, dtype=torch.float) + circuit_edges = torch.transpose( + torch.tensor(circuit_graph.edges, dtype=torch.long), 0, 1 + ) + circuit_edge_features = torch.tensor( + circuit_graph.edge_features, dtype=torch.float + ) + return Data( + x=circuit_nodes, edge_index=circuit_edges, edge_attr=circuit_edge_features + ) diff --git a/blackwater/data/encoders/torch.py b/blackwater/data/encoders/graph_utils.py similarity index 93% rename from blackwater/data/encoders/torch.py rename to blackwater/data/encoders/graph_utils.py index 8b4ff5c..76ff179 100644 --- a/blackwater/data/encoders/torch.py +++ b/blackwater/data/encoders/graph_utils.py @@ -1,6 +1,5 @@ """Graph encoders.""" -from abc import ABC from dataclasses import dataclass, asdict from typing import Union, List, Dict, Optional, Tuple @@ -16,7 +15,7 @@ from qiskit.transpiler import Target from torch_geometric.data import Data -from blackwater.data.core import DataEncoder, BlackwaterData +from blackwater.data.core import DataEncoder, BlackwaterData, NodeEncoder from blackwater.data.encoders.utils import OperatorData, encode_operator from blackwater.exception import BlackwaterException @@ -24,15 +23,7 @@ ALL_INSTRUCTIONS = list(get_standard_gate_name_mapping().keys()) -# pylint: disable=no-member -class NodeEncoder(ABC): - """Base class for circuit dag node encoder.""" - - def encode(self, node: DAGNode) -> List[float]: - """Encodes node of circuit dag.""" - raise NotImplementedError - - +# pylint: disable=no-member, arguments-differ class DefaultNodeEncoder(NodeEncoder): """DefaultNodeEncoder.""" @@ -58,7 +49,7 @@ def __init__(self, available_instructions: Optional[List[str]] = None): for idx, inst in enumerate(available_instructions) } - def encode(self, node: DAGNode) -> List[float]: + def encode(self, node: DAGNode, **kwargs) -> List[float]: # type: ignore if isinstance(node, DAGOpNode): params_encoding = [0.0, 0.0, 0.0] for i, param in enumerate(node.op.params): @@ -144,7 +135,7 @@ def __init__(self, backend: BackendV2): self.num_qubits = backend.num_qubits self.properties: BackendProperties = extract_properties_from_backend(backend) - def encode(self, node: DAGNode) -> List[float]: + def encode(self, node: DAGNode, **kwargs) -> List[float]: # type: ignore if isinstance(node, DAGOpNode): params_encoding = [0.0, 0.0, 0.0] for i, param in enumerate(node.op.params): @@ -380,19 +371,18 @@ class DefaultPyGEstimatorEncoder(DataEncoder): """Default encoder for pyg data. Converts circuit data into torch_geometric.Data""" - def encode(self, **kwargs) -> Tuple[Data, float]: - circuit: QuantumCircuit = kwargs.get("circuit") - operator: PauliSumOp = kwargs.get("operator") - exp_value = kwargs.get("exp_val") - backend = kwargs.get("backend") - - if circuit is None or operator is None or exp_value is None or backend is None: - raise BlackwaterException("Missing encoder input.") - + def encode( # type: ignore + self, + circuit: QuantumCircuit, + operator: PauliSumOp, + exp_val: float, + backend: BackendV2, + **kwargs, + ) -> Tuple[Data, float]: data = ExpValData.build( circuit=circuit, - expectation_values=[exp_value], + expectation_values=[exp_val], observable=operator, backend=backend, ).to_pyg() - return data, exp_value + return data, exp_val diff --git a/blackwater/data/encoders/operator.py b/blackwater/data/encoders/operator.py new file mode 100644 index 0000000..c921b50 --- /dev/null +++ b/blackwater/data/encoders/operator.py @@ -0,0 +1,27 @@ +"""Operator encoders.""" + +import numpy as np +from qiskit.quantum_info import Operator + +from blackwater.data.core import OperatorEncoder +from blackwater.data.encoders.utils import encode_operator + + +class DefaultOperatorEncoder(OperatorEncoder): + """Default operator encoder to turn operator class into numpy array.""" + + def encode(self, operator: Operator, **kwargs) -> np.ndarray: # type: ignore + """Encodes operator. + + Args: + operator: operator to encoder + **kwargs: other arguments + + Returns: + numpy array + """ + operator_encoding = [] + for entry in encode_operator(operator).operator: + operator_encoding += entry + + return np.array(operator_encoding) diff --git a/blackwater/data/encoders/numpy.py b/blackwater/data/encoders/primtives_utils.py similarity index 53% rename from blackwater/data/encoders/numpy.py rename to blackwater/data/encoders/primtives_utils.py index cc3f700..3dbc023 100644 --- a/blackwater/data/encoders/numpy.py +++ b/blackwater/data/encoders/primtives_utils.py @@ -5,7 +5,8 @@ from qiskit.opflow import PauliSumOp from blackwater.data.core import DataEncoder -from blackwater.data.encoders.utils import encode_operator +from blackwater.data.encoders.circuit import DefaultCircuitEncoder +from blackwater.data.encoders.operator import DefaultOperatorEncoder class DefaultNumpyEstimatorInputEncoder(DataEncoder): @@ -33,29 +34,7 @@ def encode(self, **kwargs): operator: PauliSumOp = kwargs.get("operator") exp_value = kwargs.get("exp_val") - depth = circuit.depth() - two_qubit_depth = circuit.depth(lambda x: x[0].num_qubits == 2) - - num_one_q_gates = 0 - num_two_q_gates = 0 - for instr in circuit._data: - num_qubits = len(instr.qubits) - if num_qubits == 1: - num_one_q_gates += 1 - if num_qubits == 2: - num_two_q_gates += 1 - - circuit_encoding = [ - depth, - two_qubit_depth, - num_one_q_gates, - num_two_q_gates, - circuit.num_qubits, - ] - - operator_encoding = [] - for entry in encode_operator(operator).operator: - operator_encoding += entry - - data_encoding = [exp_value] + circuit_encoding + operator_encoding - return np.array(data_encoding) + circuit_encoding = DefaultCircuitEncoder().encode(circuit) + operator_encoding = DefaultOperatorEncoder().encode(operator) + + return np.concatenate([[exp_value], circuit_encoding, operator_encoding]) diff --git a/blackwater/data/encoders/utils.py b/blackwater/data/encoders/utils.py index 1a576b5..d52d9a9 100644 --- a/blackwater/data/encoders/utils.py +++ b/blackwater/data/encoders/utils.py @@ -1,11 +1,23 @@ """Operator encoders.""" from dataclasses import dataclass -from typing import Union, List +from typing import Optional, List, Dict, Union, Any +import numpy as np +import torch +from qiskit import QuantumCircuit +from qiskit.circuit import Qubit +from qiskit.converters import circuit_to_dag +from qiskit.dagcircuit import DAGOpNode, DAGInNode, DAGOutNode from qiskit.opflow import PauliSumOp -from qiskit.quantum_info import SparsePauliOp +from qiskit.primitives import BaseEstimator +from qiskit.providers import BackendV1 +from qiskit.quantum_info import random_pauli_list, SparsePauliOp from qiskit.quantum_info.operators.base_operator import BaseOperator +from qiskit_aer import AerSimulator +from qiskit_aer.primitives import Estimator as AerEstimator +from torch_geometric.data import Data +# pylint: disable=no-member from blackwater.exception import BlackwaterException @@ -73,3 +85,492 @@ def encode_operator(operator: Union[BaseOperator]) -> OperatorData: ) return OperatorData(result) + + +available_gate_names = [ + # one qubit gates + "id", + "u1", + "u2", + "u3", + "x", + "y", + "z", + "h", + "s", + "sdg", + "t", + "tdg", + "rx", + "ry", + "rz", + # two qubit gates + "cx", + "cy", + "cz", + "ch", + "crz", + "cu1", + "cu3", + "swap", + "rzz", + # three qubit gates + "ccx", + "cswap", +] + + +def circuit_to_pyg_data( + circuit: QuantumCircuit, gate_set: Optional[List[str]] = None +) -> Data: + """Convert circuit to Pytorch geometric data. + Note: Homogenous conversion. + + Args: + circuit: quantum circuit + gate_set: list of instruction to use to encode data. + if None, default instruction set will be used + + Returns: + Pytorch geometric data + """ + num_qubits = circuit.num_qubits + gate_set = gate_set or available_gate_names + # add "other" gates + gate_set += ["barrier", "measure", "delay"] + + dag_circuit = circuit_to_dag(circuit) + + nodes = list(dag_circuit.nodes()) + edges = list(dag_circuit.edges()) + + nodes_dict: Dict[DAGOpNode, List[float]] = {} + + for _, node in enumerate(nodes): + if isinstance(node, (DAGInNode, DAGOutNode)): + # TODO: use in and out nodes + pass + elif isinstance(node, DAGOpNode): + # TODO: use node.cargs + + # get information on which qubits this gate is operating + affected_qubits = [0.0] * num_qubits + for arg in node.qargs: + affected_qubits[arg.index] = 1.0 + + # encoding of gate name + gate_encoding = [0.0] * len(gate_set) + gate_encoding[gate_set.index(node.op.name)] = 1.0 + + # gate parameters + node_params = [0.0, 0.0, 0.0] + for i, param in enumerate(node.op.params): + node_params[i] = param + + feature_vector = gate_encoding + affected_qubits + node_params + + nodes_dict[node] = feature_vector + + nodes_indices = {node: idx for idx, node in enumerate(nodes_dict.keys())} + + edge_index = [] + edge_attr = [] + + for edge in edges: + source, dest, _ = edge + + if isinstance(source, DAGOpNode) and isinstance(dest, DAGOpNode): + edge_index.append([nodes_indices[source], nodes_indices[dest]]) + edge_attr.append([0.0]) + else: + # TODO: handle in and out nodes + pass + + return Data( + x=torch.tensor(list(nodes_dict.values()), dtype=torch.float), + edge_index=torch.tensor(np.transpose(edge_index), dtype=torch.long), + edge_attr=torch.tensor(np.transpose(edge_attr), dtype=torch.float), + circuit_depth=torch.tensor([[circuit.depth()]], dtype=torch.long), + ) + + +def gate_to_index(gate: Any): + """Converts gate to key: + f(gate(cx, [0, 1])) -> 'cx_0_1' + + Args: + gate: gate + + Returns: + key + """ + return f"{gate.gate}_{'_'.join([str(i) for i in gate.qubits])}" + + +def get_backend_properties_v1(backend: BackendV1): + """Get properties from BackendV1 + + Args: + backend: backend + + Returns: + json with backend information + """ + props = backend.properties() + + def get_parameters(gate): + return { + **{"gate_error": 0.0, "gate_length": 0.0}, + **{param.name: param.value for param in gate.parameters}, + } + + return { + "name": backend.name(), + "gates_set": list({g.gate for g in props.gates}), + "num_qubits": len(props.qubits), + "qubits_props": { + index: { + "index": index, + "t1": props.qubit_property(index).get("T1", (0, 0))[0], + "t2": props.qubit_property(index).get("T2", (0, 0))[0], + "readout_error": props.qubit_property(index).get( + "readout_error", (0, 0) + )[0], + } + for index in range(len(props.qubits)) + }, + "gate_props": { + gate_to_index(gate): {"index": gate_to_index(gate), **get_parameters(gate)} + for gate in props.gates + }, + } + + +def counts_to_feature_vector(counts: dict, num_qubits: int) -> List[float]: + """Convert counts to feature vector. + + Args: + counts: counts + num_qubits: number of qubits + + Returns: + list of floats + """ + count_format = "{:0" + str(num_qubits) + "b}" + all_possible_measurements = { + count_format.format(i): 0 for i in range(2**num_qubits) + } + + shots = sum(counts.values()) + all_counts = {**all_possible_measurements, **counts} + return list(float(v) / shots for v in all_counts.values()) + + +def circuit_to_graph_data_json( + circuit: QuantumCircuit, + properties: dict, + use_gate_features: bool = False, + use_qubit_features: bool = False, +): + """Converts circuit to json (dict) for PyG data. + + Args: + circuit: quantum circuit + properties: call get_backend_properties_v1 for backend + use_gate_features: use gate features in data graph + use_qubit_features: use qubit features in data graph + """ + + # feature map for gate types + additional_gate_types = [ + "barrier", + "measure", + # "delay" + ] + gate_type_feature_map = { + g_name: index + for index, g_name in enumerate(properties["gates_set"] + additional_gate_types) + } + + # convert to dag + dag_circuit = circuit_to_dag(circuit) + + nodes = list(dag_circuit.nodes()) + edges = list(dag_circuit.edges()) + + # get node data + nodes_dict: Dict[str, Dict[str, Any]] = { + "DAGOpNode": {}, + "DAGInNode": {}, + "DAGOutNode": {}, + } + + for node in nodes: + if isinstance(node, DAGOpNode): + # qubit features + qubit_properties: Dict[int, Dict[str, Any]] = { + i: {} for i in range(3) + } # as 3 is max number of operable gate size + if node.name != "barrier" and len(node.qargs) > 3: + raise BlackwaterException( + "Non barrier gate that has more than 3 qubits." + "Those tyoe of gates are not supported yet." + ) + + if node.name != "barrier": # barriers are more than 3 qubits + for i, qubit in enumerate(node.qargs): + qubit_properties[i] = properties["qubits_props"][qubit.index] + + t1_vector = [v.get("t1", 0.0) for v in qubit_properties.values()] + t2_vector = [v.get("t2", 0.0) for v in qubit_properties.values()] + readout_error_vector = [ + v.get("readout_error", 0.0) for v in qubit_properties.values() + ] + + qubit_feature_vector = t1_vector + t2_vector + readout_error_vector + + # gate features + index = len(list(nodes_dict["DAGOpNode"].keys())) + + instruction_key = ( + f"{node.op.name}_" + f"{'_'.join([str(args.index) for args in list(node.qargs)])}" + ) + + gate_props = properties["gate_props"].get(instruction_key, {}) + gate_props = {**{"gate_error": 0.0, "gate_length": 0.0}, **gate_props} + if "index" in gate_props: + del gate_props["index"] + + # one hot encoding of gate type + gate_type_feature = [0.0 for _ in range(len(gate_type_feature_map))] + gate_type_feature[gate_type_feature_map[node.op.name]] = 1.0 + + # gate parameter values feature vector + gate_params_feature_vector = [ + 0.0, + 0.0, + 0.0, + ] # 3 is max number of parameter in any instruction + for idx, p in enumerate(node.op.params): + if isinstance(p, (float, int)): + gate_params_feature_vector[idx] = float(p) + elif p.is_real(): + gate_params_feature_vector[idx] = float(p._symbol_expr) + + # gate properties + gate_props_feature_vector = [ + gate_props["gate_error"], + gate_props["gate_length"], + ] + + feature_vector = gate_params_feature_vector + gate_type_feature + if use_qubit_features: + feature_vector += qubit_feature_vector + if use_gate_features: + feature_vector += gate_props_feature_vector + + nodes_dict["DAGOpNode"][node] = { + "index": index, + "type": "DAGOpNode", + "name": node.op.name, + "num_qubits": node.op.num_qubits, + "num_clbits": node.op.num_clbits, + "params": [], + "feature_vector": feature_vector, + **gate_props, + } + + elif isinstance(node, (DAGInNode, DAGOutNode)): + node_type_key = str(type(node)).split("'")[1].split(".")[-1] + index = len(list(nodes_dict[node_type_key].keys())) + nodes_dict[node_type_key][node] = { + "index": index, + "type": node_type_key, + "register": node.wire.register.name, + "bit": node.wire.index, + "feature_vector": [0, 0], + } + + # get edge data + edge_dict = {} + + for edge in edges: + source, dest, wire = edge + source_type = str(type(source)).split("'")[1].split(".")[-1] + dest_type = str(type(dest)).split("'")[1].split(".")[-1] + + source = nodes_dict[source_type][source] + dest = nodes_dict[dest_type][dest] + + if isinstance(wire, Qubit): + edge_attrs = properties["qubits_props"][wire.index] + key = (source_type, "wire", dest_type) + + if key not in edge_dict: + edge_dict[key] = { + "edge_index": [[source["index"], dest["index"]]], + "edge_attr": [ + [ + edge_attrs["t1"], + edge_attrs["t2"], + edge_attrs["readout_error"], + ] + ], + } + else: + edge_dict[key]["edge_index"].append([source["index"], dest["index"]]) + edge_dict[key]["edge_attr"].append( + [edge_attrs["t1"], edge_attrs["t2"], edge_attrs["readout_error"]] + ) + + # form data + data: Dict[str, Dict[str, Any]] = {"nodes": {}, "edges": {}} + + data["nodes"]["DAGOpNode"] = [ + node["feature_vector"] for node in nodes_dict["DAGOpNode"].values() + ] + data["nodes"]["DAGInNode"] = [ + node["feature_vector"] for node in nodes_dict["DAGInNode"].values() + ] + data["nodes"]["DAGOutNode"] = [ + node["feature_vector"] for node in nodes_dict["DAGOutNode"].values() + ] + + for key, d in edge_dict.items(): + edge_index = np.array(d["edge_index"]).T.tolist() + edge_attr = d["edge_attr"] + + data["edges"]["_".join(list(key))] = { + "edge_index": edge_index, + "edge_attr": edge_attr, + } + + # # get measurements + # sim_ideal = AerSimulator() + # sim_noisy = AerSimulator.from_backend(backend) + # + # result_ideal = sim_ideal.run(circuit).result().get_counts() + # result_noisy = sim_noisy.run(circuit).result().get_counts() + # + # data["y"] = { + # "ideal": counts_to_feature_vector(result_ideal, properties["num_qubits"]), + # "nosiy": counts_to_feature_vector(result_noisy, properties["num_qubits"]) + # } + + return data + + +def create_counts_meas_data( + backend: BackendV1, circuit: QuantumCircuit, properties: Dict[str, Any] +): + """Creates counts measurement for circuit + + Args: + backend: backend + circuit: circuit + properties: backend properties + + Returns: + dict of ideal and noisy measurements + """ + # get measurements + sim_ideal = AerSimulator() + sim_noisy = AerSimulator.from_backend(backend) + + result_ideal = sim_ideal.run(circuit).result().get_counts() + result_noisy = sim_noisy.run(circuit).result().get_counts() + + return { + "ideal": counts_to_feature_vector(result_ideal, properties["num_qubits"]), + "nosiy": counts_to_feature_vector(result_noisy, properties["num_qubits"]), + } + + +# pylint: disable=no-value-for-parameter +def create_estimator_meas_data( + backend: BackendV1, circuit: QuantumCircuit, observable: PauliSumOp +): + """Runs Aer estimator with noisy and ideal setup.""" + ideal_estimator = AerEstimator() + ideal_result = ideal_estimator.run([circuit], [observable]) + ideal_exp_value = ideal_result.result().values[0] + + noisy_estimator = AerEstimator() + noisy_simulator = AerSimulator().from_backend(backend) + noisy_estimator._backend = noisy_simulator + noisy_result = noisy_estimator.run([circuit], [observable]) + noisy_exp_value = noisy_result.result().values[0] + return ideal_exp_value, noisy_exp_value + + +def create_meas_data_from_estimators( + circuits: QuantumCircuit, + observables: SparsePauliOp, + estimators: List[BaseEstimator], + **run_params, +): + """Returns exp values results from given estimators for + given circuit and observable. + + Args: + circuits: circuit + observables: observable + estimators: list of esimators to use + **run_params: estimator run arguments + + Returns: + list of exp values for given estimators + """ + results = [] + for estimator in estimators: + result = estimator.run(circuits, observables, **run_params).result() + results.append(result.values[0]) + return results + + +def encode_pauli_sum_op(op: Union[PauliSumOp, SparsePauliOp]): + """Encodes pauli sum operator + + Args: + op: operator + + Returns: + encoded representation of operator + """ + + if isinstance(op, SparsePauliOp): + op = PauliSumOp.from_list(op.to_list()) + + mapping = { + "X": [0, 0, 0, 1], + "Y": [0, 0, 1, 0], + "Z": [0, 1, 0, 0], + "I": [1, 0, 0, 0], + } + coeffs = [k.coeffs[0].real for k in op] + strings = [str(k.primitive.paulis[0]) for k in op] + rows = [] + for c, pauli in zip(coeffs, strings): + encoded_row = [c] + for p in pauli: + encoded_row += mapping.get(p, [0, 0, 0, 0]) + rows.append(encoded_row) + return rows + + +def generate_random_pauli_sum_op( + n_qubits: int, size: int, coeff: Optional[float] = None +) -> PauliSumOp: + """Generates random pauli sum op.""" + paulis = [] + coeffs = ( + [coeff] * size + if coeff + else np.random.uniform(low=-1.0, high=1.0, size=(size,)).tolist() + ) + for coefficient, pauli in zip( + coeffs, random_pauli_list(n_qubits, size, phase=False) + ): + paulis.append((str(pauli), coefficient)) + return PauliSumOp.from_list(paulis) diff --git a/blackwater/data/generators/exp_val.py b/blackwater/data/generators/exp_val.py index 160ec7f..2612758 100644 --- a/blackwater/data/generators/exp_val.py +++ b/blackwater/data/generators/exp_val.py @@ -9,11 +9,11 @@ from qiskit.providers import BackendV1 from torch_geometric.data import Data -from blackwater.data.utils import ( +from blackwater.data.encoders.utils import ( + get_backend_properties_v1, + circuit_to_graph_data_json, generate_random_pauli_sum_op, create_estimator_meas_data, - circuit_to_graph_data_json, - get_backend_properties_v1, encode_pauli_sum_op, ) diff --git a/blackwater/data/generators/rb.py b/blackwater/data/generators/rb.py index da098ac..b82ee02 100644 --- a/blackwater/data/generators/rb.py +++ b/blackwater/data/generators/rb.py @@ -7,14 +7,14 @@ from qiskit.quantum_info import Operator from qiskit_experiments.library import StandardRB -from blackwater.data.generators.exp_val import ExpValueEntry -from blackwater.data.utils import ( +from blackwater.data.encoders.utils import ( + get_backend_properties_v1, circuit_to_graph_data_json, generate_random_pauli_sum_op, - get_backend_properties_v1, create_estimator_meas_data, encode_pauli_sum_op, ) +from blackwater.data.generators.exp_val import ExpValueEntry def generate_rb_circuit( diff --git a/blackwater/data/utils.py b/blackwater/data/utils.py index 4d53d01..68ffbd6 100644 --- a/blackwater/data/utils.py +++ b/blackwater/data/utils.py @@ -1,507 +1 @@ """Data utilities.""" -from typing import Optional, List, Dict, Union, Any - -import numpy as np -import torch -from qiskit import QuantumCircuit -from qiskit.circuit import Qubit -from qiskit.converters import circuit_to_dag -from qiskit.dagcircuit import DAGOpNode, DAGInNode, DAGOutNode -from qiskit.opflow import PauliSumOp -from qiskit.primitives import BaseEstimator -from qiskit.providers import BackendV1 -from qiskit.quantum_info import random_pauli_list, SparsePauliOp -from qiskit_aer import AerSimulator -from qiskit_aer.primitives import Estimator as AerEstimator -from torch_geometric.data import Data - -# pylint: disable=no-member -from blackwater.exception import BlackwaterException - -available_gate_names = [ - # one qubit gates - "id", - "u1", - "u2", - "u3", - "x", - "y", - "z", - "h", - "s", - "sdg", - "t", - "tdg", - "rx", - "ry", - "rz", - # two qubit gates - "cx", - "cy", - "cz", - "ch", - "crz", - "cu1", - "cu3", - "swap", - "rzz", - # three qubit gates - "ccx", - "cswap", -] - - -def circuit_to_pyg_data( - circuit: QuantumCircuit, gate_set: Optional[List[str]] = None -) -> Data: - """Convert circuit to Pytorch geometric data. - Note: Homogenous conversion. - - Args: - circuit: quantum circuit - gate_set: list of instruction to use to encode data. - if None, default instruction set will be used - - Returns: - Pytorch geometric data - """ - num_qubits = circuit.num_qubits - gate_set = gate_set or available_gate_names - # add "other" gates - gate_set += ["barrier", "measure", "delay"] - - dag_circuit = circuit_to_dag(circuit) - - nodes = list(dag_circuit.nodes()) - edges = list(dag_circuit.edges()) - - nodes_dict: Dict[DAGOpNode, List[float]] = {} - - for _, node in enumerate(nodes): - if isinstance(node, (DAGInNode, DAGOutNode)): - # TODO: use in and out nodes - pass - elif isinstance(node, DAGOpNode): - # TODO: use node.cargs - - # get information on which qubits this gate is operating - affected_qubits = [0.0] * num_qubits - for arg in node.qargs: - affected_qubits[arg.index] = 1.0 - - # encoding of gate name - gate_encoding = [0.0] * len(gate_set) - gate_encoding[gate_set.index(node.op.name)] = 1.0 - - # gate parameters - node_params = [0.0, 0.0, 0.0] - for i, param in enumerate(node.op.params): - node_params[i] = param - - feature_vector = gate_encoding + affected_qubits + node_params - - nodes_dict[node] = feature_vector - - nodes_indices = {node: idx for idx, node in enumerate(nodes_dict.keys())} - - edge_index = [] - edge_attr = [] - - for edge in edges: - source, dest, _ = edge - - if isinstance(source, DAGOpNode) and isinstance(dest, DAGOpNode): - edge_index.append([nodes_indices[source], nodes_indices[dest]]) - edge_attr.append([0.0]) - else: - # TODO: handle in and out nodes - pass - - return Data( - x=torch.tensor(list(nodes_dict.values()), dtype=torch.float), - edge_index=torch.tensor(np.transpose(edge_index), dtype=torch.long), - edge_attr=torch.tensor(np.transpose(edge_attr), dtype=torch.float), - circuit_depth=torch.tensor([[circuit.depth()]], dtype=torch.long), - ) - - -def gate_to_index(gate: Any): - """Converts gate to key: - f(gate(cx, [0, 1])) -> 'cx_0_1' - - Args: - gate: gate - - Returns: - key - """ - return f"{gate.gate}_{'_'.join([str(i) for i in gate.qubits])}" - - -def get_backend_properties_v1(backend: BackendV1): - """Get properties from BackendV1 - - Args: - backend: backend - - Returns: - json with backend information - """ - props = backend.properties() - - def get_parameters(gate): - return { - **{"gate_error": 0.0, "gate_length": 0.0}, - **{param.name: param.value for param in gate.parameters}, - } - - return { - "name": backend.name(), - "gates_set": list({g.gate for g in props.gates}), - "num_qubits": len(props.qubits), - "qubits_props": { - index: { - "index": index, - "t1": props.qubit_property(index).get("T1", (0, 0))[0], - "t2": props.qubit_property(index).get("T2", (0, 0))[0], - "readout_error": props.qubit_property(index).get( - "readout_error", (0, 0) - )[0], - } - for index in range(len(props.qubits)) - }, - "gate_props": { - gate_to_index(gate): {"index": gate_to_index(gate), **get_parameters(gate)} - for gate in props.gates - }, - } - - -def counts_to_feature_vector(counts: dict, num_qubits: int) -> List[float]: - """Convert counts to feature vector. - - Args: - counts: counts - num_qubits: number of qubits - - Returns: - list of floats - """ - count_format = "{:0" + str(num_qubits) + "b}" - all_possible_measurements = { - count_format.format(i): 0 for i in range(2**num_qubits) - } - - shots = sum(counts.values()) - all_counts = {**all_possible_measurements, **counts} - return list(float(v) / shots for v in all_counts.values()) - - -def circuit_to_graph_data_json( - circuit: QuantumCircuit, - properties: dict, - use_gate_features: bool = False, - use_qubit_features: bool = False, -): - """Converts circuit to json (dict) for PyG data. - - Args: - circuit: quantum circuit - properties: call get_backend_properties_v1 for backend - use_gate_features: use gate features in data graph - use_qubit_features: use qubit features in data graph - """ - - # feature map for gate types - additional_gate_types = [ - "barrier", - "measure", - # "delay" - ] - gate_type_feature_map = { - g_name: index - for index, g_name in enumerate(properties["gates_set"] + additional_gate_types) - } - - # convert to dag - dag_circuit = circuit_to_dag(circuit) - - nodes = list(dag_circuit.nodes()) - edges = list(dag_circuit.edges()) - - # get node data - nodes_dict: Dict[str, Dict[str, Any]] = { - "DAGOpNode": {}, - "DAGInNode": {}, - "DAGOutNode": {}, - } - - for node in nodes: - if isinstance(node, DAGOpNode): - # qubit features - qubit_properties: Dict[int, Dict[str, Any]] = { - i: {} for i in range(3) - } # as 3 is max number of operable gate size - if node.name != "barrier" and len(node.qargs) > 3: - raise BlackwaterException( - "Non barrier gate that has more than 3 qubits." - "Those tyoe of gates are not supported yet." - ) - - if node.name != "barrier": # barriers are more than 3 qubits - for i, qubit in enumerate(node.qargs): - qubit_properties[i] = properties["qubits_props"][qubit.index] - - t1_vector = [v.get("t1", 0.0) for v in qubit_properties.values()] - t2_vector = [v.get("t2", 0.0) for v in qubit_properties.values()] - readout_error_vector = [ - v.get("readout_error", 0.0) for v in qubit_properties.values() - ] - - qubit_feature_vector = t1_vector + t2_vector + readout_error_vector - - # gate features - index = len(list(nodes_dict["DAGOpNode"].keys())) - - instruction_key = ( - f"{node.op.name}_" - f"{'_'.join([str(args.index) for args in list(node.qargs)])}" - ) - - gate_props = properties["gate_props"].get(instruction_key, {}) - gate_props = {**{"gate_error": 0.0, "gate_length": 0.0}, **gate_props} - if "index" in gate_props: - del gate_props["index"] - - # one hot encoding of gate type - gate_type_feature = [0.0 for _ in range(len(gate_type_feature_map))] - gate_type_feature[gate_type_feature_map[node.op.name]] = 1.0 - - # gate parameter values feature vector - gate_params_feature_vector = [ - 0.0, - 0.0, - 0.0, - ] # 3 is max number of parameter in any instruction - for idx, p in enumerate(node.op.params): - if isinstance(p, (float, int)): - gate_params_feature_vector[idx] = float(p) - elif p.is_real(): - gate_params_feature_vector[idx] = float(p._symbol_expr) - - # gate properties - gate_props_feature_vector = [ - gate_props["gate_error"], - gate_props["gate_length"], - ] - - feature_vector = gate_params_feature_vector + gate_type_feature - if use_qubit_features: - feature_vector += qubit_feature_vector - if use_gate_features: - feature_vector += gate_props_feature_vector - - nodes_dict["DAGOpNode"][node] = { - "index": index, - "type": "DAGOpNode", - "name": node.op.name, - "num_qubits": node.op.num_qubits, - "num_clbits": node.op.num_clbits, - "params": [], - "feature_vector": feature_vector, - **gate_props, - } - - elif isinstance(node, (DAGInNode, DAGOutNode)): - node_type_key = str(type(node)).split("'")[1].split(".")[-1] - index = len(list(nodes_dict[node_type_key].keys())) - nodes_dict[node_type_key][node] = { - "index": index, - "type": node_type_key, - "register": node.wire.register.name, - "bit": node.wire.index, - "feature_vector": [0, 0], - } - - # get edge data - edge_dict = {} - - for edge in edges: - source, dest, wire = edge - source_type = str(type(source)).split("'")[1].split(".")[-1] - dest_type = str(type(dest)).split("'")[1].split(".")[-1] - - source = nodes_dict[source_type][source] - dest = nodes_dict[dest_type][dest] - - if isinstance(wire, Qubit): - edge_attrs = properties["qubits_props"][wire.index] - key = (source_type, "wire", dest_type) - - if key not in edge_dict: - edge_dict[key] = { - "edge_index": [[source["index"], dest["index"]]], - "edge_attr": [ - [ - edge_attrs["t1"], - edge_attrs["t2"], - edge_attrs["readout_error"], - ] - ], - } - else: - edge_dict[key]["edge_index"].append([source["index"], dest["index"]]) - edge_dict[key]["edge_attr"].append( - [edge_attrs["t1"], edge_attrs["t2"], edge_attrs["readout_error"]] - ) - - # form data - data: Dict[str, Dict[str, Any]] = {"nodes": {}, "edges": {}} - - data["nodes"]["DAGOpNode"] = [ - node["feature_vector"] for node in nodes_dict["DAGOpNode"].values() - ] - data["nodes"]["DAGInNode"] = [ - node["feature_vector"] for node in nodes_dict["DAGInNode"].values() - ] - data["nodes"]["DAGOutNode"] = [ - node["feature_vector"] for node in nodes_dict["DAGOutNode"].values() - ] - - for key, d in edge_dict.items(): - edge_index = np.array(d["edge_index"]).T.tolist() - edge_attr = d["edge_attr"] - - data["edges"]["_".join(list(key))] = { - "edge_index": edge_index, - "edge_attr": edge_attr, - } - - # # get measurements - # sim_ideal = AerSimulator() - # sim_noisy = AerSimulator.from_backend(backend) - # - # result_ideal = sim_ideal.run(circuit).result().get_counts() - # result_noisy = sim_noisy.run(circuit).result().get_counts() - # - # data["y"] = { - # "ideal": counts_to_feature_vector(result_ideal, properties["num_qubits"]), - # "nosiy": counts_to_feature_vector(result_noisy, properties["num_qubits"]) - # } - - return data - - -def create_counts_meas_data( - backend: BackendV1, circuit: QuantumCircuit, properties: Dict[str, Any] -): - """Creates counts measurement for circuit - - Args: - backend: backend - circuit: circuit - properties: backend properties - - Returns: - dict of ideal and noisy measurements - """ - # get measurements - sim_ideal = AerSimulator() - sim_noisy = AerSimulator.from_backend(backend) - - result_ideal = sim_ideal.run(circuit).result().get_counts() - result_noisy = sim_noisy.run(circuit).result().get_counts() - - return { - "ideal": counts_to_feature_vector(result_ideal, properties["num_qubits"]), - "nosiy": counts_to_feature_vector(result_noisy, properties["num_qubits"]), - } - - -# pylint: disable=no-value-for-parameter -def create_estimator_meas_data( - backend: BackendV1, circuit: QuantumCircuit, observable: PauliSumOp -): - """Runs Aer estimator with noisy and ideal setup.""" - ideal_estimator = AerEstimator() - ideal_result = ideal_estimator.run([circuit], [observable]) - ideal_exp_value = ideal_result.result().values[0] - - noisy_estimator = AerEstimator() - noisy_simulator = AerSimulator().from_backend(backend) - noisy_estimator._backend = noisy_simulator - noisy_result = noisy_estimator.run([circuit], [observable]) - noisy_exp_value = noisy_result.result().values[0] - return ideal_exp_value, noisy_exp_value - - -def create_meas_data_from_estimators( - circuits: QuantumCircuit, - observables: SparsePauliOp, - estimators: List[BaseEstimator], - **run_params, -): - """Returns exp values results from given estimators for - given circuit and observable. - - Args: - circuits: circuit - observables: observable - estimators: list of esimators to use - **run_params: estimator run arguments - - Returns: - list of exp values for given estimators - """ - results = [] - for estimator in estimators: - result = estimator.run(circuits, observables, **run_params).result() - results.append(result.values[0]) - return results - - -def encode_pauli_sum_op(op: Union[PauliSumOp, SparsePauliOp]): - """Encodes pauli sum operator - - Args: - op: operator - - Returns: - encoded representation of operator - """ - - if isinstance(op, SparsePauliOp): - op = PauliSumOp.from_list(op.to_list()) - - mapping = { - "X": [0, 0, 0, 1], - "Y": [0, 0, 1, 0], - "Z": [0, 1, 0, 0], - "I": [1, 0, 0, 0], - } - coeffs = [k.coeffs[0].real for k in op] - strings = [str(k.primitive.paulis[0]) for k in op] - rows = [] - for c, pauli in zip(coeffs, strings): - encoded_row = [c] - for p in pauli: - encoded_row += mapping.get(p, [0, 0, 0, 0]) - rows.append(encoded_row) - return rows - - -def generate_random_pauli_sum_op( - n_qubits: int, size: int, coeff: Optional[float] = None -) -> PauliSumOp: - """Generates random pauli sum op.""" - paulis = [] - coeffs = ( - [coeff] * size - if coeff - else np.random.uniform(low=-1.0, high=1.0, size=(size,)).tolist() - ) - for coefficient, pauli in zip( - coeffs, random_pauli_list(n_qubits, size, phase=False) - ): - paulis.append((str(pauli), coefficient)) - return PauliSumOp.from_list(paulis) diff --git a/tests/data/encoders/test_graph.py b/tests/data/encoders/test_graph.py index 934997c..de8adb3 100644 --- a/tests/data/encoders/test_graph.py +++ b/tests/data/encoders/test_graph.py @@ -4,7 +4,7 @@ from qiskit import QuantumCircuit, transpile from qiskit.providers.fake_provider import FakeLimaV2 -from blackwater.data.encoders.torch import ( +from blackwater.data.encoders.graph_utils import ( circuit_to_json_graph, GraphData, BackendNodeEncoder, diff --git a/tests/data/encoders/test_numpy.py b/tests/data/encoders/test_numpy.py index be3adc3..ad00dcb 100644 --- a/tests/data/encoders/test_numpy.py +++ b/tests/data/encoders/test_numpy.py @@ -5,8 +5,8 @@ import numpy as np from qiskit.circuit.random import random_circuit -from blackwater.data.encoders.numpy import DefaultNumpyEstimatorInputEncoder -from blackwater.data.utils import generate_random_pauli_sum_op +from blackwater.data import DefaultNumpyEstimatorInputEncoder +from blackwater.data.encoders.utils import generate_random_pauli_sum_op class TestNumpyEncoders(TestCase): diff --git a/tests/data/io_tests/test_io.py b/tests/data/io_tests/test_io.py index c291cbb..db5a91b 100644 --- a/tests/data/io_tests/test_io.py +++ b/tests/data/io_tests/test_io.py @@ -9,7 +9,7 @@ from qiskit.quantum_info import SparsePauliOp from blackwater.data.dataio import ExpValDataWriter, ExpValDataReader -from blackwater.data.encoders.torch import ExpValData +from blackwater.data.encoders.graph_utils import ExpValData from tests.data.encoders.test_graph import create_bell_circuit diff --git a/tests/data/loaders/test_dataclasses_loader.py b/tests/data/loaders/test_dataclasses_loader.py index 7d96b0a..42259dc 100644 --- a/tests/data/loaders/test_dataclasses_loader.py +++ b/tests/data/loaders/test_dataclasses_loader.py @@ -10,7 +10,7 @@ from torch_geometric.data import Data from blackwater.data.dataio import ExpValDataWriter -from blackwater.data.encoders.torch import ExpValData +from blackwater.data.encoders.graph_utils import ExpValData from blackwater.data.loaders.dataclasses import ExpValDataSet from tests.data.encoders.test_graph import create_bell_circuit diff --git a/tests/data/test_dataclasses.py b/tests/data/test_dataclasses.py index 1972b94..84c4756 100644 --- a/tests/data/test_dataclasses.py +++ b/tests/data/test_dataclasses.py @@ -7,7 +7,7 @@ from qiskit.quantum_info import SparsePauliOp from torch_geometric.data import Data -from blackwater.data.encoders.torch import ExpValData +from blackwater.data.encoders.graph_utils import ExpValData from tests.data.encoders.test_graph import create_bell_circuit diff --git a/tests/data/test_utils.py b/tests/data/test_utils.py index 71162db..8bb5a35 100644 --- a/tests/data/test_utils.py +++ b/tests/data/test_utils.py @@ -4,7 +4,7 @@ from qiskit import QuantumCircuit from torch_geometric.data import Data -from blackwater.data.utils import circuit_to_pyg_data +from blackwater.data.encoders.utils import circuit_to_pyg_data class TestDataUtils(TestCase): diff --git a/tests/primitives/test_learning_estimator.py b/tests/primitives/test_learning_estimator.py index e759362..8160739 100644 --- a/tests/primitives/test_learning_estimator.py +++ b/tests/primitives/test_learning_estimator.py @@ -10,9 +10,9 @@ from sklearn.datasets import make_regression from sklearn.ensemble import RandomForestRegressor -from blackwater.data.encoders.numpy import DefaultNumpyEstimatorInputEncoder -from blackwater.data.encoders.torch import DefaultPyGEstimatorEncoder -from blackwater.data.utils import generate_random_pauli_sum_op +from blackwater.data import DefaultNumpyEstimatorInputEncoder +from blackwater.data.encoders.graph_utils import DefaultPyGEstimatorEncoder +from blackwater.data.encoders.utils import generate_random_pauli_sum_op from blackwater.primitives.learning_esimator import ( learning_estimator, ScikitLearnEstimatorModel,