diff --git a/setup.py b/setup.py index c35992f..6477683 100755 --- a/setup.py +++ b/setup.py @@ -14,12 +14,14 @@ def generate_cython_stub_file(pyx_filepath: str, output_filepath: str) -> None: # load file contents pyx_content = open(pyx_filepath, "r").read() - # strip cython syntax, empty lines, and comments + # Replace 'cimport' with 'import' + pyx_content = re.sub(r'\bcimport\b', 'import', pyx_content) + + # strip cython syntax and comments pyx_content = re.sub("cdef ", "", pyx_content) - pyx_content = re.sub(r"^\s*\n", "", pyx_content, flags=re.MULTILINE) pyx_content = re.sub(r"^\s*#.*\n", "", pyx_content, flags=re.MULTILINE) - # identify top-level import lines + # identify top-level import lines (including modified 'cimport' lines) pattern = re.compile(r"^(import|from)\s+.*\n", re.MULTILINE) for match in pattern.finditer(pyx_content): pyi_content += pyx_content[match.start() : match.end()] @@ -33,19 +35,23 @@ def generate_cython_stub_file(pyx_filepath: str, output_filepath: str) -> None: docstring_double = r"\"\"\".*?\"\"\"" docstring_single = r"'''.*?'''" docstring = rf"\s*?(?:{docstring_double}|{docstring_single})\s*?\n" - pattern = re.compile(rf"({decorator})?({declaration})({docstring})?", re.DOTALL | re.MULTILINE) + pattern = re.compile( + rf"({decorator})?({declaration})({docstring})?", re.DOTALL | re.MULTILINE + ) for match in pattern.finditer(pyx_content): content = pyx_content[match.start() : match.end()] if not ignore_pattern.match(content, re.MULTILINE): pyi_content += content.rstrip() # strip trailing whitespace - # If there is a docstring, we only need to add a newline character. Otherwise, we also - # need to add ellipses as a placeholder for the class/method "body". + # If there is a docstring, we only need to add a newline character. + # Otherwise, we also need to add ellipses as a placeholder for the class/method "body". suffix = "\n" if match.group(3) else " ...\n" pyi_content += suffix open(output_filepath, "w").write(pyi_content) + + ## BUILD if sys.platform == "darwin": diff --git a/src_python/ldpc/belief_find_decoder/__init__.pyi b/src_python/ldpc/belief_find_decoder/__init__.pyi index 4b8af69..b4ef4d2 100644 --- a/src_python/ldpc/belief_find_decoder/__init__.pyi +++ b/src_python/ldpc/belief_find_decoder/__init__.pyi @@ -1,12 +1,16 @@ import numpy as np from scipy.sparse import spmatrix + class BeliefFindDecoder(BpDecoderBase): + """ A class representing a decoder that combines Belief Propagation (BP) with the Union Find Decoder (UFD) algorithm. + The BeliefFindDecoder is designed to decode binary linear codes by initially attempting BP decoding, and if that fails, it falls back to the Union Find Decoder algorithm. The UFD algorithm is based on the principles outlined in https://arxiv.org/abs/1709.06218, with an option to utilise a more general version as described in https://arxiv.org/abs/2103.08049 for LDPC codes by setting `uf_method=True`. + Parameters ---------- pcm : Union[np.ndarray, scipy.sparse.spmatrix] @@ -37,29 +41,45 @@ class BeliefFindDecoder(BpDecoderBase): The inversion method can be applied to any parity check matrix. bits_per_step : int, optional Specifies the number of bits added to the cluster in each step of the UFD algorithm. If no value is provided, this is set the block length of the code. + Notes ----- The `BeliefFindDecoder` class leverages soft information outputted by the BP decoder to guide the cluster growth in the UFD algorithm. The number of bits added to the cluster in each step is controlled by the `bits_per_step` parameter. The `uf_method` parameter activates a more general version of the UFD algorithm suitable for LDPC codes when set to True. """ + + def __cinit__(self, pcm: Union[np.ndarray, scipy.sparse.spmatrix], error_rate: Optional[float] = None, + error_channel: Optional[List[float]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', + ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', omp_thread_count: Optional[int] = 1, + random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, uf_method: str = "peeling", bits_per_step:int = 0, input_vector_type: str = "syndrome"): ... + + def __del__(self): ... + def decode(self,syndrome): + """ Decodes the input syndrome using the belief propagation and UFD decoding methods. + Initially, the method attempts to decode the syndrome using belief propagation. If this fails to converge, it falls back to the UFD algorithm. + Parameters ---------- syndrome : np.ndarray The input syndrome to decode. + Returns ------- np.ndarray The decoded output. + Raises ------ ValueError If the length of the input syndrome is not equal to the length of the code. """ + + @property def uf_method(self): ... diff --git a/src_python/ldpc/bp_decoder/__init__.pyi b/src_python/ldpc/bp_decoder/__init__.pyi index 1296114..2883841 100644 --- a/src_python/ldpc/bp_decoder/__init__.pyi +++ b/src_python/ldpc/bp_decoder/__init__.pyi @@ -3,206 +3,282 @@ import scipy.sparse from typing import Optional, List, Union import warnings import ldpc.helpers.scipy_helpers + + def io_test(pcm: Union[scipy.sparse.spmatrix,np.ndarray]): ... + + + class BpDecoderBase: + """ Bp Decoder base class """ + + def __cinit__(self,pcm, **kwargs): ... + + def __del__(self): ... + @property def error_rate(self) -> np.ndarray: """ Returns the current error rate vector. + Returns: np.ndarray: A numpy array containing the current error rate vector. """ + @error_rate.setter def error_rate(self, value: Optional[float]) -> None: """ Sets the error rate for the decoder. + Args: value (Optional[float]): The error rate value to be set. Must be a single float value. """ + @property def error_channel(self) -> np.ndarray: """ Returns the current error channel vector. + Returns: np.ndarray: A numpy array containing the current error channel vector. """ + @error_channel.setter def error_channel(self, value: Union[Optional[List[float]],np.ndarray]) -> None: """ Sets the error channel for the decoder. + Args: value (Optional[List[float]]): The error channel vector to be set. Must have length equal to the block length of the code `self.n`. """ + def update_channel_probs(self, value: Union[List[float],np.ndarray]) -> None: ... + @property def channel_probs(self) -> np.ndarray: ... + + @property def input_vector_type(self)-> str: """ Returns the current input vector type. + Returns: str: The current input vector type. """ + + @input_vector_type.setter def input_vector_type(self, input_type: str): """ Sets the input vector type. + Args: input_type (str): The input vector type to be set. Must be either 'syndrome' or 'received_vector'. """ + + @property def log_prob_ratios(self) -> np.ndarray: """ Returns the current log probability ratio vector. + Returns: np.ndarray: A numpy array containing the current log probability ratio vector. """ + @property def converge(self) -> bool: """ Returns whether the decoder has converged or not. + Returns: bool: True if the decoder has converged, False otherwise. """ + @property def iter(self) -> int: """ Returns the number of iterations performed by the decoder. + Returns: int: The number of iterations performed by the decoder. """ + + @property def check_count(self) -> int: """ Returns the number of rows of the parity check matrix. + Returns: int: The number of rows of the parity check matrix. """ + @property def bit_count(self) -> int: """ Returns the number of columns of the parity check matrix. + Returns: int: The number of columns of the parity check matrix. """ + @property def max_iter(self) -> int: """ Returns the maximum number of iterations allowed by the decoder. + Returns: int: The maximum number of iterations allowed by the decoder. """ + @max_iter.setter def max_iter(self, value: int) -> None: """ Sets the maximum number of iterations allowed by the decoder. + Args: value (int): The maximum number of iterations allowed by the decoder. + Raises: ValueError: If value is not a positive integer. """ + @property def bp_method(self) -> str: """ Returns the belief propagation method used. + Returns: str: The belief propagation method used. Possible values are 'product_sum' or 'minimum_sum'. """ + @bp_method.setter def bp_method(self, value: Union[str,int]) -> None: """ Sets the belief propagation method used. + Args: value (str): The belief propagation method to use. Possible values are 'product_sum' or 'minimum_sum'. + Raises: ValueError: If value is not a valid option. """ + @property def schedule(self) -> str: """ Returns the scheduling method used. + Returns: str: The scheduling method used. Possible values are 'parallel' or 'serial'. """ + @schedule.setter def schedule(self, value: Union[str,int]) -> None: """ Sets the scheduling method used. + Args: value (str): The scheduling method to use. Possible values are 'parallel' or 'serial'. + Raises: ValueError: If value is not a valid option. """ + @property def serial_schedule_order(self) -> Union[None, np.ndarray]: """ Returns the serial schedule order. + Returns: Union[None, np.ndarray]: The serial schedule order as a numpy array, or None if no schedule has been set. """ + @serial_schedule_order.setter def serial_schedule_order(self, value: Union[None, List[int], np.ndarray]) -> None: """ Sets the serial schedule order. + Args: value (Union[None, List[int]]): The serial schedule order to set. Must have length equal to the block length of the code `self.n`. + Raises: Exception: If value does not have the correct length. ValueError: If value contains an invalid integer value. """ + @property def ms_scaling_factor(self) -> float: """Get the scaling factor for minimum sum method. + Returns: float: The current scaling factor. """ + @ms_scaling_factor.setter def ms_scaling_factor(self, value: float) -> None: """Set the scaling factor for minimum sum method. + Args: value (float): The new scaling factor. + Raises: TypeError: If the input value is not a float. """ + @property def omp_thread_count(self) -> int: """Get the number of OpenMP threads. + Returns: int: The number of threads used. """ + @omp_thread_count.setter def omp_thread_count(self, value: int) -> None: """Set the number of OpenMP threads. + Args: value (int): The number of threads to use. + Raises: TypeError: If the input value is not an integer or is less than 1. """ + @property def random_schedule_seed(self) -> int: """Get the value of random_schedule_seed. + Returns: int: The current value of random_schedule_seed. """ + @random_schedule_seed.setter def random_schedule_seed(self, value: int) -> None: """Set the value of random_schedule_seed. + Args: value (int): The new value of random_schedule_seed. + Raises: ValueError: If the input value is not a postive integer. """ + class BpDecoder(BpDecoderBase): """ Belief propagation decoder for binary linear codes. + This class provides an implementation of belief propagation decoding for binary linear codes. The decoder uses a sparse parity check matrix to decode received codewords. The decoding algorithm can be configured using various parameters, such as the belief propagation method used, the scheduling method used, and the maximum number of iterations. + Parameters ---------- pcm : Union[np.ndarray, spmatrix] @@ -230,41 +306,58 @@ class BpDecoder(BpDecoderBase): Note, it is only necessary to specify this value when the parity check matrix is square. When the parity matrix is non-square the input vector type is inferred automatically from its length. """ + + def __cinit__(self, pcm: Union[np.ndarray, scipy.sparse.spmatrix], error_rate: Optional[float] = None, + error_channel: Optional[Union[np.ndarray,List[float]]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', + ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', omp_thread_count: Optional[int] = 1, + random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, input_vector_type: str = "auto", **kwargs): ... + def __init__(self, pcm: Union[np.ndarray, scipy.sparse.spmatrix], error_rate: Optional[float] = None, error_channel: Optional[Union[np.ndarray,List[float]]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', omp_thread_count: Optional[int] = 1, random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, input_vector_type: str = "auto", **kwargs): ... + def decode(self, input_vector: np.ndarray) -> np.ndarray: """ Decode the input input_vector using belief propagation decoding algorithm. + Parameters ---------- input_vector : numpy.ndarray A 1D numpy array of length equal to the number of rows in the parity check matrix. + Returns ------- numpy.ndarray A 1D numpy array of length equal to the number of columns in the parity check matrix. + Raises ------ ValueError If the length of the input input_vector does not match the number of rows in the parity check matrix. """ + + @property def decoding(self) -> np.ndarray: """ Returns the current decoded output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ + + class SoftInfoBpDecoder(BpDecoderBase): """ A decoder that uses soft information belief propagation algorithm for decoding binary linear codes. + This class implements a modified version of the belief propagation decoding algorithm that accounts for uncertainty in the syndrome readout using a serial belief propagation schedule. The decoder uses a minimum sum method as the belief propagation variant. For more information on the algorithm, please see the original research paper at https://arxiv.org/abs/2205.02341. + Parameters ---------- pcm : Union[np.ndarray, spmatrix] @@ -283,29 +376,41 @@ class SoftInfoBpDecoder(BpDecoderBase): cutoff : Optional[float] The threshold value below which syndrome soft information is used. """ + + def __cinit__(self, pcm: Union[np.ndarray, spmatrix], error_rate: Optional[float] = None, + error_channel: Optional[List[float]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', + ms_scaling_factor: Optional[float] = 1.0, cutoff: Optional[float] = np.inf, sigma: float = 2.0, **kwargs): ... + def decode(self, soft_info_syndrome: np.ndarray) -> np.ndarray: """ Decode the input syndrome using the soft information belief propagation decoding algorithm. + Parameters ---------- soft_info_syndrome: np.ndarray A 1-dimensional numpy array containing the soft information of the syndrome. + Returns ------- np.ndarray A 1-dimensional numpy array containing the decoded output. """ + @property def soft_syndrome(self) -> np.ndarray: """ Returns the current soft syndrome. + Returns: np.ndarray: A numpy array containing the current soft syndrome. """ + + @property def decoding(self) -> np.ndarray: """ Returns the current decoded output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ diff --git a/src_python/ldpc/bp_flip/__init__.pyi b/src_python/ldpc/bp_flip/__init__.pyi index 9848e90..7ba8eec 100644 --- a/src_python/ldpc/bp_flip/__init__.pyi +++ b/src_python/ldpc/bp_flip/__init__.pyi @@ -2,12 +2,22 @@ import numpy as np import warnings from scipy.sparse import spmatrix from typing import Union, List, Optional + class BpFlipDecoder(BpDecoderBase): ... + + def __del__(self): ... + def decode(self, syndrome: np.ndarray) -> np.ndarray: ... + + + + + @property def decoding(self) -> np.ndarray: """ Returns the current decoded output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ diff --git a/src_python/ldpc/bplsd_decoder/__init__.pyi b/src_python/ldpc/bplsd_decoder/__init__.pyi index ebc2617..c14524c 100644 --- a/src_python/ldpc/bplsd_decoder/__init__.pyi +++ b/src_python/ldpc/bplsd_decoder/__init__.pyi @@ -1,13 +1,16 @@ import numpy as np from scipy.sparse import spmatrix import json -from ldpc.bposd_decoder cimport OsdMethod +from ldpc.bposd_decoder import OsdMethod import warnings + class BpLsdDecoder(BpDecoderBase): """ A class representing a decoder that combines Belief Propagation (BP) with the Localised Statistics Decoder (LSD) algorithm. + The BpLsdDecoder is designed to decode binary linear codes by initially attempting BP decoding, and if that fails, it falls back to the Localised Statistics Decoder algorithm. + Parameters ---------- pcm : Union[np.ndarray, scipy.sparse.spmatrix] @@ -39,29 +42,49 @@ class BpLsdDecoder(BpDecoderBase): lsd_method: str, optional The LSD method of the LSD algorithm applied to each cluster. Must be one of {'LSD_0', 'LSD_E', 'LSD_CS'}. By default 'LSD_0'. + Notes ----- The `BpLsdDecoder` class leverages soft information outputted by the BP decoder to guide the cluster growth in the LSD algorithm. The number of bits added to the cluster in each step is controlled by the `bits_per_step` parameter. """ + + def __cinit__(self, pcm: Union[np.ndarray, scipy.sparse.spmatrix], error_rate: Optional[float] = None, + error_channel: Optional[List[float]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', + ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', + omp_thread_count: Optional[int] = 1, + random_schedule_seed: Optional[int] = 0, + serial_schedule_order: Optional[List[int]] = None, + bits_per_step:int = 1, + input_vector_type: str = "syndrome", + lsd_order: int = 0, + lsd_method: Union[str, int] = 0, **kwargs): ... + + def __del__(self): ... + def decode(self,syndrome): """ Decodes the input syndrome using the belief propagation and LSD decoding methods. + Initially, the method attempts to decode the syndrome using belief propagation. If this fails to converge, it falls back to the LSD algorithm. + Parameters ---------- syndrome : np.ndarray The input syndrome to decode. + Returns ------- np.ndarray The decoded output. + Raises ------ ValueError If the length of the input syndrome is not equal to the length of the code. """ + @property def statistics(self) -> Statistics: """ @@ -71,77 +94,97 @@ class BpLsdDecoder(BpDecoderBase): Statistics The statistics object. """ + @property def do_stats(self) -> bool: """ Returns whether the statistics are being collected. + Returns ------- bool Whether the statistics are being collected. """ + def set_do_stats(self, value: bool) -> None: """ Sets whether the statistics are being collected. + Parameters ---------- value : bool Whether the statistics are being collected. """ + @property def lsd_method(self) -> Optional[str]: """ The Localized Statistic Decoding (LSD) method used. + Returns ------- Optional[str] A string representing the LSD method used. Must be one of {'LSD_0', 'LSD_E', 'LSD_CS'}. If no LSD method has been set, returns `None`. """ + @lsd_method.setter def lsd_method(self, method: Union[str, int, float]) -> None: """ Sets the LSD method used. That is, the OSD method per cluster. + Parameters ---------- method : Union[str, int, float] A string, integer or float representing the OSD method to use. Must be one of {'LSD_0', 'LSD_E', 'LSD_CS'}, corresponding to LSD order-0, LSD Exhaustive or LSD-Cominbation-Sweep. """ + + @property def lsd_order(self) -> int: """ The LSD order used. + Returns ------- int An integer representing the OSD order used. """ + + @lsd_order.setter def lsd_order(self, order: int) -> None: """ Set the order for the LSD method. + Parameters ---------- order : int The order for the OSD method. Must be a positive integer. + Raises ------ ValueError If order is less than 0. + Warns ----- UserWarning If the LSD method is 'OSD_E' and the order is greater than 15. + """ + def set_additional_stat_fields(self, error, syndrome, compare_recover) -> None: """ Sets additional fields to be collected in the statistics. + Parameters ---------- fields : List[str] A list of strings representing the additional fields to be collected in the statistics. """ + def reset_cluster_stats(self) -> None: """ Resets cluster statistics of the decoder. diff --git a/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx b/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx index 2d01d55..0283ab2 100644 --- a/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx +++ b/src_python/ldpc/bplsd_decoder/_bplsd_decoder.pyx @@ -7,7 +7,6 @@ from ldpc.bposd_decoder cimport OsdMethod import warnings cdef class BpLsdDecoder(BpDecoderBase): - """ A class representing a decoder that combines Belief Propagation (BP) with the Localised Statistics Decoder (LSD) algorithm. @@ -45,6 +44,7 @@ cdef class BpLsdDecoder(BpDecoderBase): lsd_method: str, optional The LSD method of the LSD algorithm applied to each cluster. Must be one of {'LSD_0', 'LSD_E', 'LSD_CS'}. By default 'LSD_0'. + Notes ----- The `BpLsdDecoder` class leverages soft information outputted by the BP decoder to guide the cluster growth @@ -101,7 +101,6 @@ cdef class BpLsdDecoder(BpDecoderBase): del self.lsd def decode(self,syndrome): - """ Decodes the input syndrome using the belief propagation and LSD decoding methods. diff --git a/src_python/ldpc/bposd_decoder/__init__.pyi b/src_python/ldpc/bposd_decoder/__init__.pyi index 31e25cf..f749615 100644 --- a/src_python/ldpc/bposd_decoder/__init__.pyi +++ b/src_python/ldpc/bposd_decoder/__init__.pyi @@ -2,11 +2,14 @@ import numpy as np import warnings from scipy.sparse import spmatrix from typing import Union, List, Optional + class BpOsdDecoder(BpDecoderBase): """ Belief propagation and Ordered Statistic Decoding (OSD) decoder for binary linear codes. + This class provides an implementation of the BP decoding that uses Ordered Statistic Decoding (OSD) as a fallback method if the BP does not converge. The class inherits from the `BpDecoderBase` class. + Parameters ---------- pcm : Union[np.ndarray, spmatrix] @@ -35,108 +38,149 @@ class BpOsdDecoder(BpDecoderBase): The OSD method used. Must be one of {'OSD_0', 'OSD_E', 'OSD_CS'}. osd_order : int, optional The OSD order, by default 0. + Notes ----- This class makes use of the C++ module `ldpc::osd::OsdDecoderCpp` for implementing the OSD decoder. The `__cinit__` method initializes this module with the parity check matrix and channel probabilities from the belief propagation decoder. The `__del__` method deallocates memory if it has been allocated. """ + + def __cinit__(self, pcm: Union[np.ndarray, spmatrix], error_rate: Optional[float] = None, + error_channel: Optional[List[float]] = None, max_iter: Optional[int] = 0, bp_method: Optional[str] = 'minimum_sum', + ms_scaling_factor: Optional[float] = 1.0, schedule: Optional[str] = 'parallel', omp_thread_count: Optional[int] = 1, + random_schedule_seed: Optional[int] = 0, serial_schedule_order: Optional[List[int]] = None, osd_method: Union[str, int, float] = 0, + osd_order: int = 0, input_vector_type: str = "syndrome", **kwargs): ... + + def __del__(self): ... + def decode(self, syndrome: np.ndarray) -> np.ndarray: """ Decodes the input syndrome using the belief propagation and OSD decoding methods. + This method takes an input syndrome and decodes it using the belief propagation (BP) decoding method. If the BP decoding method converges, it returns the decoding output. Otherwise, the method falls back to using the Ordered Statistic Decoding (OSD) decoding method. + Parameters ---------- syndrome : np.ndarray The input syndrome to decode. + Returns ------- np.ndarray A numpy array containing the decoded output. + Raises ------ ValueError If the length of the input syndrome is not equal to the length of the code. + Notes ----- This method first checks if the input syndrome is all zeros. If it is, it returns an array of zeros of the same length as the codeword. If the BP decoding method converges, it returns the decoding output. Otherwise, it falls back to using the OSD decoding method. The OSD method used is specified by the `osd_method` parameter passed to the class constructor. The OSD order used is specified by the `osd_order` parameter passed to the class constructor. + """ + + + @property def osd_method(self) -> Optional[str]: """ The Ordered Statistic Decoding (OSD) method used. + Returns ------- Optional[str] A string representing the OSD method used. Must be one of {'OSD_0', 'OSD_E', 'OSD_CS'}. If no OSD method has been set, returns `None`. """ + @osd_method.setter def osd_method(self, method: Union[str, int, float]) -> None: """ Sets the OSD method used. + Parameters ---------- method : Union[str, int, float] A string, integer or float representing the OSD method to use. Must be one of {'OSD_0', 'OSD_E', 'OSD_CS'}, corresponding to OSD order-0, OSD Exhaustive or OSD-Cominbation-Sweep. """ + + @property def osd_order(self) -> int: """ The OSD order used. + Returns ------- int An integer representing the OSD order used. """ + + @osd_order.setter def osd_order(self, order: int) -> None: """ Set the order for the OSD method. + Parameters ---------- order : int The order for the OSD method. Must be a positive integer. + Raises ------ ValueError If order is less than 0. + Warns ----- UserWarning If the OSD method is 'OSD_E' and the order is greater than 15. + """ + @property def decoding(self) -> np.ndarray: """ Returns the current decoded output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ + @property def bp_decoding(self) -> np.ndarray: """ Returns the current BP decoding output. + Returns: np.ndarray: A numpy array containing the BP decoding output. """ + + + @property def osd0_decoding(self) -> np.ndarray: """ Returns the current OSD-0 decoding output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ + @property def osdw_decoding(self) -> np.ndarray: """ Returns the current OSD-W decoding output. + Returns: np.ndarray: A numpy array containing the current decoded output. """ diff --git a/src_python/ldpc/mod2/__init__.pyi b/src_python/ldpc/mod2/__init__.pyi index cbe44c2..5f171ba 100644 --- a/src_python/ldpc/mod2/__init__.pyi +++ b/src_python/ldpc/mod2/__init__.pyi @@ -3,134 +3,176 @@ import scipy.sparse import ldpc.mod2._legacy_v1 import ldpc.helpers.scipy_helpers from typing import Tuple, Union, List -from libc.stdint cimport uintptr_t +from libc.stdint import uintptr_t + def csc_to_scipy_sparse(vector[vector[int]]& col_adjacency_list): """ Converts CSC matrix to sparse matrix """ + def rank(pcm: Union[scipy.sparse.spmatrix, np.ndarray], method: str = "dense") -> int: """ Calculate the rank of a given parity check matrix. + This function calculates the rank of the parity check matrix (pcm) using either a dense or sparse method. The dense method is used by default. + Parameters ---------- pcm : Union[scipy.sparse.spmatrix, np.ndarray] The parity check matrix to be ranked. method : str, optional The method to use for calculating the rank. Options are "dense" or "sparse". Defaults to "dense". + Returns ------- int The rank of the parity check matrix. """ + def nullspace(pcm: Union[scipy.sparse.spmatrix, np.ndarray], method = "dense") -> scipy.sparse.spmatrix: """ Calculate the kernel of a given parity check matrix. + Parameters: pcm (Union[scipy.sparse.spmatrix, np.ndarray]): The parity check matrix for kernel calculation. + Returns: scipy.sparse.spmatrix: The kernel of the parity check matrix. """ + def kernel(pcm: Union[scipy.sparse.spmatrix, np.ndarray], method = "dense") -> scipy.sparse.spmatrix: """ Calculate the kernel of a given parity check matrix (same as the nullspace). + Parameters: pcm (Union[scipy.sparse.spmatrix, np.ndarray]): The parity check matrix for kernel calculation. + Returns: scipy.sparse.spmatrix: The kernel of the parity check matrix. """ + def row_complement_basis(pcm: Union[scipy.sparse.spmatrix, np.ndarray]) -> scipy.sparse.spmatrix: """ Calculate the row complement basis of a given parity check matrix. + Parameters: pcm (Union[scipy.sparse.spmatrix, np.ndarray]): The parity check matrix for row complement basis calculation. + Returns: scipy.sparse.spmatrix: The row complement basis of the parity check matrix. """ + def pivot_rows(mat: Union[np.ndarray,scipy.sparse.spmatrix]): """ Find the pivot rows of a given matrix. + This function finds the pivot rows of the input matrix. The input matrix can be either a dense numpy array or a sparse scipy matrix. The function first converts the input matrix to a CSR list, then calls the C++ function `pivot_rows_cpp` to find the pivot rows. + Parameters ---------- mat : Union[np.ndarray, scipy.sparse.spmatrix] The input matrix. + Returns ------- numpy.ndarray A numpy array containing the pivot rows of the input matrix. """ + def io_test(pcm: Union[scipy.sparse.spmatrix,np.ndarray]): """ Test function """ + def estimate_code_distance(pcm: Union[scipy.sparse.spmatrix,np.ndarray], timeout_seconds: float = 0.025, number_of_words_to_save = 10): + """ Estimate the code distance of a binary matrix representing a parity-check matrix. + This function estimates the minimum distance of a code defined by the given parity-check matrix (PCM). The calculation runs until either the specified timeout is reached or an estimate is found. + Parameters ---------- pcm : Union[scipy.sparse.spmatrix, np.ndarray] The parity-check matrix representing the code, provided as a scipy sparse matrix or a numpy ndarray. + timeout_seconds : float, optional (default=0.025) The maximum time in seconds allowed for estimating the code distance. + number_of_words_to_save : int, optional (default=10) The number of minimum-weight codewords to save in the returned matrix. + Returns ------- min_distance : int The estimated minimum distance of the code. + samples_searched : int The number of samples that were searched to find the minimum distance. + min_weight_words_matrix : scipy.sparse.csr_matrix A sparse matrix containing the minimum-weight codewords found, up to `number_of_words_to_save`. """ + def row_span(pcm: Union[scipy.sparse.spmatrix,np.ndarray]) -> scipy.sparse.spmatrix: """ Compute the row span of a given parity check matrix. + This function computes the row span of the input parity check matrix (pcm). The input matrix can be either a dense numpy array or a sparse scipy matrix. The function first converts the input matrix to a CSR list, then calls the C++ function `row_span_cpp` to compute the row span. + Parameters ---------- pcm : Union[np.ndarray, scipy.sparse.spmatrix] The input parity check matrix. + Returns ------- scipy.sparse.spmatrix The row span of the input matrix. """ + def compute_exact_code_distance(pcm: Union[scipy.sparse.spmatrix,np.ndarray]): + """ Compute the exact code distance of a binary matrix representing a parity-check matrix. + This function computes the exact minimum distance of a code defined by the given parity-check matrix (PCM). Unlike the estimation function, this function guarantees the precise minimum distance, though it may be computationally intensive. + Parameters ---------- pcm : Union[scipy.sparse.spmatrix, np.ndarray] The parity-check matrix representing the code, provided as a scipy sparse matrix or a numpy ndarray. + Returns ------- distance : int The exact minimum distance of the code. """ + def row_basis(pcm: Union[scipy.sparse.spmatrix,np.ndarray]) -> scipy.sparse.spmatrix: """ Compute the row basis of a given parity check matrix. + Parameters ---------- pcm : Union[np.ndarray, scipy.sparse.spmatrix] The input parity check matrix. + Returns ------- scipy.sparse.spmatrix The row basis of the input matrix. """ + def row_echelon(matrix: Union[np.ndarray,scipy.sparse.spmatrix], full: bool = False) -> List[np.ndarray, int, np.ndarray, np.ndarray]: """ Converts a binary matrix to row echelon form via Gaussian Elimination + Parameters ---------- matrix : numpy.ndarry or scipy.sparse @@ -139,6 +181,7 @@ def row_echelon(matrix: Union[np.ndarray,scipy.sparse.spmatrix], full: bool = Fa If set to `True', Gaussian elimination is only performed on the rows below the pivot. If set to `False' Gaussian eliminatin is performed on rows above and below the pivot. + Returns ------- row_ech_form: numpy.ndarray @@ -149,6 +192,7 @@ def row_echelon(matrix: Union[np.ndarray,scipy.sparse.spmatrix], full: bool = Fa The transformation matrix such that (transform_matrix@matrix)=row_ech_form pivot_cols: list List of the indices of pivot num_cols found during Gaussian elimination + Examples -------- >>> H=np.array([[1, 1, 1],[1, 1, 1],[0, 1, 0]]) @@ -157,20 +201,25 @@ def row_echelon(matrix: Union[np.ndarray,scipy.sparse.spmatrix], full: bool = Fa [[1 1 1] [0 1 0] [0 0 0]] + >>> re_matrix=row_echelon(H,full=True)[0] >>> print(re_matrix) [[1 0 1] [0 1 0] [0 0 0]] + """ + def reduced_row_echelon(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> List[np.ndarray, int, np.ndarray, np.ndarray]: """ Converts matrix to reduced row echelon form such that the output has the form rre=[I,A] + Parameters ---------- matrix: numpy.ndarray A binary matrix in numpy.ndarray format + Returns ------- reduced_row_echelon_from: numpy.ndarray @@ -181,6 +230,7 @@ def reduced_row_echelon(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> Lis The transformation matrix for row permutations transform_matrix_cols: numpy.ndarray The transformation matrix for the columns + Examples -------- >>> H=np.array([[0, 0, 0, 1, 1, 1, 1],[0, 1, 1, 0, 0, 1, 1],[1, 0, 1, 0, 1, 0, 1]]) @@ -189,31 +239,46 @@ def reduced_row_echelon(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> Lis [[1 0 0 1 1 0 1] [0 1 0 1 0 1 1] [0 0 1 0 1 1 1]] + """ + + def inverse(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> np.ndarray: """ Computes the left inverse of a full-rank matrix. + Notes ----- + The `left inverse' is computed when the number of rows in the matrix exceeds the matrix rank. The left inverse is defined as follows:: + Inverse(M.T@M)@M.T + We can make a further simplification by noting that the row echelon form matrix with full column rank has the form:: + row_echelon_form=P@M=vstack[I,A] + In this case the left inverse simplifies to:: + Inverse(M^T@P^T@P@M)@M^T@P^T@P=M^T@P^T@P=row_echelon_form.T@P + Parameters ---------- matrix: numpy.ndarray The binary matrix to be inverted in numpy.ndarray format. This matrix must either be square full-rank or rectangular with full-column rank. + Returns ------- numpy.ndarray The inverted binary matrix + + Examples -------- + >>> # full-rank square matrix >>> mat=np.array([[1,1,0],[0,1,0],[0,0,1]]) >>> i_mat=inverse(mat) @@ -221,6 +286,7 @@ def inverse(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> np.ndarray: [[1 0 0] [0 1 0] [0 0 1]] + >>> # full-column rank matrix >>> mat=np.array([[1,1,0],[0,1,0],[0,0,1],[0,1,1]]) >>> i_mat=inverse(mat) @@ -228,56 +294,80 @@ def inverse(matrix: Union[np.ndarray, scipy.sparse.spmatrix]) -> np.ndarray: [[1 0 0] [0 1 0] [0 0 1]] + """ + + class PluDecomposition(): """ Initialise the PLU Decomposition with a given parity check matrix. + Parameters: pcm (Union[scipy.sparse.spmatrix, np.ndarray]): The parity check matrix for PLU Decomposition. full_reduce (bool, optional): Flag to indicate if full row reduction is required. Default is False. lower_triangular (bool, optional): Flag to indicate if the result should be in lower triangular form. Default is True. """ + + def __init__(self, pcm: Union[scipy.sparse.spmatrix, np.ndarray], full_reduce: bool = False, lower_triangular: bool = True) -> None: ... + + def __cinit__(self, pcm: Union[scipy.sparse.spmatrix,np.ndarray], full_reduce: bool = False, lower_triangular: bool = True): ... + + def lu_solve(self, y: np.ndarray) -> np.ndarray: """ Solve the LU decomposition problem for a given array 'y'. + Parameters: y (np.ndarray): Array to be solved. + Returns: np.ndarray: Solution array. """ + @property def L(self) -> scipy.sparse.spmatrix: """ Get the lower triangular matrix from the LU decomposition. + Returns: scipy.sparse.spmatrix: Lower triangular matrix. """ + @property def U(self) -> scipy.sparse.spmatrix: """ Get the upper triangular matrix from the LU decomposition. + Returns: scipy.sparse.spmatrix: Upper triangular matrix. """ + @property def P(self) -> scipy.sparse.spmatrix: """ Get the permutation matrix from the LU decomposition. + Returns: scipy.sparse.spmatrix: Permutation matrix. """ + @property def rank(self) -> int: """ Get the rank of the matrix used for the LU decomposition. + Returns: int: Rank of the matrix. """ + @property def pivots(self) -> np.ndarray: """ Get the pivot positions used during the LU decomposition. + Returns: np.ndarray: Array of pivot positions. """ + + def __del__(self): ... diff --git a/src_python/ldpc/union_find_decoder/__init__.pyi b/src_python/ldpc/union_find_decoder/__init__.pyi index adec839..c341787 100644 --- a/src_python/ldpc/union_find_decoder/__init__.pyi +++ b/src_python/ldpc/union_find_decoder/__init__.pyi @@ -1,11 +1,13 @@ import numpy as np from scipy.sparse import spmatrix + class UnionFindDecoder: """ A decoder class that implements the Union Find Decoder (UFD) algorithm to decode binary linear codes. The decoder operates on a provided parity-check matrix (PCM) and can function with or without soft information from a channel. The UFD algorithm can be run in two modes: matrix solve and peeling, controlled by the `uf_method` flag. + Parameters ---------- pcm : Union[np.ndarray, spmatrix] @@ -15,9 +17,15 @@ class UnionFindDecoder: If True, the decoder operates in matrix solve mode. If False, it operates in peeling mode. Default is False. """ + + def __cinit__(self, pcm: Union[np.ndarray, spmatrix], uf_method: str = False): ... + + def __del__(self): ... + def decode(self, syndrome: np.ndarray, llrs: np.ndarray = None, bits_per_step: int = 0) -> np.ndarray: """ Decodes the given syndrome to find an estimate of the transmitted codeword. + Parameters ---------- syndrome : np.ndarray @@ -28,15 +36,18 @@ class UnionFindDecoder: bits_per_step : int, optional The number of bits to be added to clusters in each step of the decoding process. If 0, all neigbouring bits are added in one step. Default is 0. + Returns ------- np.ndarray The estimated codeword. + Raises ------ ValueError If the length of the syndrome or the length of the llrs (if provided) do not match the dimensions of the parity-check matrix. """ + @property def decoding(self): ...