From aef479989991fba84ebabddeb44f7316c4387c1a Mon Sep 17 00:00:00 2001 From: Phoenix Zerin Date: Thu, 14 Jun 2018 09:02:40 +1200 Subject: [PATCH] [#145] Reformat rest of crypto package for PEP-8. --- iota/crypto/__init__.py | 16 +- iota/crypto/addresses.py | 354 ++++++++++--------- iota/crypto/pycurl.py | 341 +++++++++--------- iota/crypto/signing.py | 724 ++++++++++++++++++++------------------- iota/crypto/types.py | 497 ++++++++++++++------------- 5 files changed, 986 insertions(+), 946 deletions(-) diff --git a/iota/crypto/__init__.py b/iota/crypto/__init__.py index 8338ae6..c76906f 100644 --- a/iota/crypto/__init__.py +++ b/iota/crypto/__init__.py @@ -1,17 +1,15 @@ # coding=utf-8 from __future__ import absolute_import, division, print_function, \ - unicode_literals - + unicode_literals # Load curl library. # If a compiled c extension is available, we will prefer to load that; # otherwise fall back to pure-Python implementation. # https://pypi.python.org/pypi/PyOTA-CCurl try: - from ccurl import * + from ccurl import * except ImportError: - from .pycurl import * - + from .pycurl import * FRAGMENT_LENGTH = 2187 """ @@ -23,7 +21,7 @@ class SeedWarning(Warning): - """ - Warning for inappropriate seeds. - """ - pass + """ + Warning for insecure or otherwise inappropriate seeds. + """ + pass diff --git a/iota/crypto/addresses.py b/iota/crypto/addresses.py index a2a5ac6..072540b 100644 --- a/iota/crypto/addresses.py +++ b/iota/crypto/addresses.py @@ -1,6 +1,6 @@ # coding=utf-8 from __future__ import absolute_import, division, print_function, \ - unicode_literals + unicode_literals from typing import Generator, Iterable, List, MutableSequence @@ -11,184 +11,198 @@ from iota.exceptions import with_context __all__ = [ - 'AddressGenerator', + 'AddressGenerator', ] class AddressGenerator(Iterable[Address]): - """ - Generates new addresses using a standard algorithm. - - Note: This class does not check if addresses have already been used; - if you want to exclude used addresses, invoke - :py:meth:`iota.api.IotaApi.get_new_addresses` instead. - - Note also that :py:meth:`iota.api.IotaApi.get_new_addresses` uses - ``AddressGenerator`` internally, so you get the best of both worlds - when you use the API (: - """ - DEFAULT_SECURITY_LEVEL = 2 - """ - Default number of iterations to use when creating digests, used to create - addresses. - - Note: this also impacts a few other things like length of transaction - signatures. - - References: - - :py:meth:`iota.transaction.ProposedBundle.sign_inputs` - - :py:class:`iota.transaction.BundleValidator` - """ - - def __init__(self, seed, security_level=DEFAULT_SECURITY_LEVEL, checksum=False): - # type: (TrytesCompatible, int, bool) -> None - super(AddressGenerator, self).__init__() - - self.security_level = security_level - self.checksum = checksum - self.seed = Seed(seed) - - def __iter__(self): - # type: () -> Generator[Address] - """ - Returns a generator for creating new addresses, starting at index - 0 and potentially continuing on forever. - """ - return self.create_iterator() - - def get_addresses(self, start, count=1, step=1): - # type: (int, int, int) -> List[Address] """ - Generates and returns one or more addresses at the specified - index(es). + Generates new addresses using a standard algorithm. - This is a one-time operation; if you want to create lots of - addresses across multiple contexts, consider invoking - :py:meth:`create_iterator` and sharing the resulting generator - object instead. + Note: This class does not check if addresses have already been used; + if you want to exclude used addresses, invoke + :py:meth:`iota.api.IotaApi.get_new_addresses` instead. - Warning: This method may take awhile to run if the starting index - and/or the number of requested addresses is a large number! - - :param start: - Starting index. - Must be >= 0. - - :param count: - Number of addresses to generate. - Must be > 0. - - :param step: - Number of indexes to advance after each address. - This may be any non-zero (positive or negative) integer. - - :return: - Always returns a list, even if only one address is generated. - - The returned list will contain ``count`` addresses, except when - ``step * count < start`` (only applies when ``step`` is - negative). + Note also that :py:meth:`iota.api.IotaApi.get_new_addresses` uses + ``AddressGenerator`` internally, so you get the best of both worlds + when you use the API (: """ - if count < 1: - raise with_context( - exc = ValueError('``count`` must be positive.'), - - context = { - 'start': start, - 'count': count, - 'step': step, - }, - ) - - if not step: - raise with_context( - exc = ValueError('``step`` must not be zero.'), - - context = { - 'start': start, - 'count': count, - 'step': step, - }, - ) - - generator = self.create_iterator(start, step) - - addresses = [] - for _ in range(count): - try: - next_addy = next(generator) - except StopIteration: - break - else: - addresses.append(next_addy) - - return addresses - - def create_iterator(self, start=0, step=1): - # type: (int, int) -> Generator[Address] + DEFAULT_SECURITY_LEVEL = 2 """ - Creates an iterator that can be used to progressively generate new - addresses. - - :param start: - Starting index. - - Warning: This method may take awhile to reset if ``start`` - is a large number! - - :param step: - Number of indexes to advance after each address. + Default number of iterations to use when creating digests, used to + create addresses. + + Note: this also impacts a few other things like length of + transaction signatures. + + References: - Warning: The generator may take awhile to advance between - iterations if ``step`` is a large number! - """ - key_iterator = ( - KeyGenerator(self.seed) - .create_iterator(start, step, self.security_level) - ) - - while True: - yield self._generate_address(key_iterator) - - @staticmethod - def address_from_digest(digest): - # type: (Digest) -> Address - """ - Generates an address from a private key digest. - """ - address_trits = [0] * (Address.LEN * TRITS_PER_TRYTE) # type: MutableSequence[int] - - sponge = Kerl() - sponge.absorb(digest.as_trits()) - sponge.squeeze(address_trits) - - return Address.from_trits( - trits = address_trits, - - key_index = digest.key_index, - security_level = digest.security_level, - ) - - def _generate_address(self, key_iterator): - # type: (KeyIterator) -> Address - """ - Generates a new address. - - Used in the event of a cache miss. - """ - if self.checksum: - return self.address_from_digest(self._get_digest(key_iterator)).with_valid_checksum() - else: - return self.address_from_digest(self._get_digest(key_iterator)) - - @staticmethod - def _get_digest(key_iterator): - # type: (KeyIterator) -> Digest + - :py:meth:`iota.transaction.ProposedBundle.sign_inputs` + - :py:class:`iota.transaction.BundleValidator` """ - Extracts parameters for :py:meth:`address_from_digest`. - Split into a separate method so that it can be mocked during unit - tests. - """ - private_key = next(key_iterator) # type: PrivateKey - return private_key.get_digest() + def __init__( + self, + seed, + security_level=DEFAULT_SECURITY_LEVEL, + checksum=False, + ): + # type: (TrytesCompatible, int, bool) -> None + super(AddressGenerator, self).__init__() + + self.security_level = security_level + self.checksum = checksum + self.seed = Seed(seed) + + def __iter__(self): + # type: () -> Generator[Address, None, None] + """ + Returns a generator for creating new addresses, starting at + index 0 and potentially continuing on forever. + """ + return self.create_iterator() + + def get_addresses(self, start, count=1, step=1): + # type: (int, int, int) -> List[Address] + """ + Generates and returns one or more addresses at the specified + index(es). + + This is a one-time operation; if you want to create lots of + addresses across multiple contexts, consider invoking + :py:meth:`create_iterator` and sharing the resulting generator + object instead. + + Warning: This method may take awhile to run if the starting + index and/or the number of requested addresses is a large + number! + + :param start: + Starting index. + Must be >= 0. + + :param count: + Number of addresses to generate. + Must be > 0. + + :param step: + Number of indexes to advance after each address. + This may be any non-zero (positive or negative) integer. + + :return: + Always returns a list, even if only one address is generated. + + The returned list will contain ``count`` addresses, except + when ``step * count < start`` (only applies when ``step`` is + negative). + """ + if count < 1: + raise with_context( + exc=ValueError('``count`` must be positive.'), + + context={ + 'start': start, + 'count': count, + 'step': step, + }, + ) + + if not step: + raise with_context( + exc=ValueError('``step`` must not be zero.'), + + context={ + 'start': start, + 'count': count, + 'step': step, + }, + ) + + generator = self.create_iterator(start, step) + + addresses = [] + for _ in range(count): + try: + next_addy = next(generator) + except StopIteration: + break + else: + addresses.append(next_addy) + + return addresses + + def create_iterator(self, start=0, step=1): + # type: (int, int) -> Generator[Address] + """ + Creates an iterator that can be used to progressively generate new + addresses. + + :param start: + Starting index. + + Warning: This method may take awhile to reset if ``start`` + is a large number! + + :param step: + Number of indexes to advance after each address. + + Warning: The generator may take awhile to advance between + iterations if ``step`` is a large number! + """ + key_iterator = ( + KeyGenerator(self.seed).create_iterator( + start, + step, + self.security_level, + ) + ) + + while True: + yield self._generate_address(key_iterator) + + @staticmethod + def address_from_digest(digest): + # type: (Digest) -> Address + """ + Generates an address from a private key digest. + """ + address_trits = [0] * (Address.LEN * TRITS_PER_TRYTE) # type: List[int] + + sponge = Kerl() + sponge.absorb(digest.as_trits()) + sponge.squeeze(address_trits) + + return Address.from_trits( + trits=address_trits, + + key_index=digest.key_index, + security_level=digest.security_level, + ) + + def _generate_address(self, key_iterator): + # type: (KeyIterator) -> Address + """ + Generates a new address. + + Used in the event of a cache miss. + """ + if self.checksum: + return ( + self.address_from_digest( + digest=self._get_digest(key_iterator), + ).with_valid_checksum() + ) + else: + return self.address_from_digest(self._get_digest(key_iterator)) + + @staticmethod + def _get_digest(key_iterator): + # type: (KeyIterator) -> Digest + """ + Extracts parameters for :py:meth:`address_from_digest`. + + Split into a separate method so that it can be mocked during + unit tests. + """ + private_key = next(key_iterator) # type: PrivateKey + return private_key.get_digest() diff --git a/iota/crypto/pycurl.py b/iota/crypto/pycurl.py index 1748175..48074c2 100644 --- a/iota/crypto/pycurl.py +++ b/iota/crypto/pycurl.py @@ -1,17 +1,16 @@ # coding=utf-8 from __future__ import absolute_import, division, print_function, \ - unicode_literals + unicode_literals from typing import List, MutableSequence, Optional, Sequence from iota.exceptions import with_context __all__ = [ - 'Curl', - 'HASH_LENGTH', + 'Curl', + 'HASH_LENGTH', ] - HASH_LENGTH = 243 """ Number of trits in a hash. @@ -44,172 +43,176 @@ class Curl(object): - """ - Python implementation of Curl. - - **IMPORTANT: Not thread-safe!** - """ - def __init__(self): - # type: (Optional[Sequence[int]]) -> None - self.reset() - - # noinspection PyAttributeOutsideInit - def reset(self): - # type: () -> None - """ - Resets internal state. - """ - self._state = [0] * STATE_LENGTH # type: List[int] - - def absorb(self, trits, offset=0, length=None): - # type: (Sequence[int], Optional[int], Optional[int]) -> None """ - Absorb trits into the sponge. - - :param trits: - Sequence of trits to absorb. - - :param offset: - Starting offset in ``trits``. + Python implementation of Curl. - :param length: - Number of trits to absorb. Defaults to ``len(trits)``. + **IMPORTANT: Not thread-safe!** """ - pad = ((len(trits) % HASH_LENGTH) or HASH_LENGTH) - trits += [0] * (HASH_LENGTH - pad) - - if length is None: - length = len(trits) - - if length < 1: - raise with_context( - exc = ValueError('Invalid length passed to ``absorb``.'), - context = { - 'trits': trits, - 'offset': offset, - 'length': length, - }, - ) - - # Copy trits from ``trits`` into internal state, one hash at a - # time, transforming internal state in between hashes. - while offset < length: - start = offset - stop = min(start + HASH_LENGTH, length) - - # - # Copy the next hash worth of trits to internal state. - # - # Note that we always copy the trits to the start of the state. - # ``self._state`` is 3 hashes long, but only the first hash is - # "public"; the other 2 are only accessible to - # :py:meth:`_transform`. - # - self._state[0:stop-start] = trits[start:stop] - - # Transform. - self._transform() - - # Move on to the next hash. - offset += HASH_LENGTH - - def squeeze(self, trits, offset=0, length=HASH_LENGTH): - # type: (MutableSequence[int], Optional[int], Optional[int]) -> None - """ - Squeeze trits from the sponge. - - :param trits: - Sequence that the squeezed trits will be copied to. - Note: this object will be modified! - - :param offset: - Starting offset in ``trits``. - :param length: - Number of trits to squeeze, default to ``HASH_LENGTH`` - """ - # - # Squeeze is kind of like the opposite of absorb; it copies trits - # from internal state to the ``trits`` parameter, one hash at a - # time, and transforming internal state in between hashes. - # - # However, only the first hash of the state is "public", so we - # can simplify the implementation somewhat. - # - - # Ensure length can be mod by HASH_LENGTH - if length % HASH_LENGTH != 0: - raise with_context( - exc = ValueError('Invalid length passed to ``squeeze`.'), - context = { - 'trits': trits, - 'offset': offset, - 'length': length, - }) - - # Ensure that ``trits`` can hold at least one hash worth of trits. - trits.extend([0] * max(0, length - len(trits))) - - # Check trits with offset can handle hash length - if len(trits) - offset < HASH_LENGTH: - raise with_context( - exc = ValueError('Invalid offset passed to ``squeeze``.'), - context = { - 'trits': trits, - 'offset': offset, - 'length': length - }, - ) - - while length >= HASH_LENGTH: - # Copy exactly one hash. - trits[offset:offset + HASH_LENGTH] = self._state[0:HASH_LENGTH] - - # One hash worth of trits copied; now transform. - self._transform() - - offset += HASH_LENGTH - length -= HASH_LENGTH - - def _transform(self): - # type: () -> None - """ - Transforms internal state. - """ - # Copy some values locally so we can avoid global lookups in the - # inner loop. - # - # References: - # - https://wiki.python.org/moin/PythonSpeed/PerformanceTips#Local_Variables - state_length = STATE_LENGTH - truth_table = TRUTH_TABLE - - # Operate on a copy of ``self._state`` to eliminate dot lookups in - # the inner loop. - # - # References: - # - https://wiki.python.org/moin/PythonSpeed/PerformanceTips#Avoiding_dots... - # - http://stackoverflow.com/a/2612990/ - prev_state = self._state[:] - new_state = prev_state[:] - - # Note: This code looks significantly different from the C - # implementation because it has been optimized to limit the number - # of list item lookups (these are relatively slow in Python). - index = 0 - for _ in range(NUMBER_OF_ROUNDS): - prev_trit = prev_state[index] - - for pos in range(state_length): - index += (364 if index < 365 else -365) - - new_trit = prev_state[index] - - new_state[pos] = truth_table[prev_trit + (3 * new_trit) + 4] - - prev_trit = new_trit - - prev_state = new_state - new_state = new_state[:] - - self._state = new_state + def __init__(self): + # type: (Optional[Sequence[int]]) -> None + self.reset() + + # noinspection PyAttributeOutsideInit + def reset(self): + # type: () -> None + """ + Resets internal state. + """ + self._state = [0] * STATE_LENGTH # type: List[int] + + def absorb(self, trits, offset=0, length=None): + # type: (Sequence[int], Optional[int], Optional[int]) -> None + """ + Absorb trits into the sponge. + + :param trits: + Sequence of trits to absorb. + + :param offset: + Starting offset in ``trits``. + + :param length: + Number of trits to absorb. Defaults to ``len(trits)``. + """ + pad = ((len(trits) % HASH_LENGTH) or HASH_LENGTH) + trits += [0] * (HASH_LENGTH - pad) + + if length is None: + length = len(trits) + + if length < 1: + raise with_context( + exc=ValueError('Invalid length passed to ``absorb``.'), + + context={ + 'trits': trits, + 'offset': offset, + 'length': length, + }, + ) + + # Copy trits from ``trits`` into internal state, one hash at a + # time, transforming internal state in between hashes. + while offset < length: + start = offset + stop = min(start + HASH_LENGTH, length) + + # Copy the next hash worth of trits to internal state. + # + # Note that we always copy the trits to the start of the + # state. ``self._state`` is 3 hashes long, but only the + # first hash is "public"; the other 2 are only accessible to + # :py:meth:`_transform`. + self._state[0:stop - start] = trits[start:stop] + + # Transform. + self._transform() + + # Move on to the next hash. + offset += HASH_LENGTH + + def squeeze(self, trits, offset=0, length=HASH_LENGTH): + # type: (MutableSequence[int], Optional[int], Optional[int]) -> None + """ + Squeeze trits from the sponge. + + :param trits: + Sequence that the squeezed trits will be copied to. + Note: this object will be modified! + + :param offset: + Starting offset in ``trits``. + + :param length: + Number of trits to squeeze, default to ``HASH_LENGTH`` + """ + # Squeeze is kind of like the opposite of absorb; it copies + # trits from internal state to the ``trits`` parameter, one hash + # at a time, and transforming internal state in between hashes. + # + # However, only the first hash of the state is "public", so we + # can simplify the implementation somewhat. + + # Ensure length can be mod by HASH_LENGTH + if length % HASH_LENGTH != 0: + raise with_context( + exc=ValueError('Invalid length passed to ``squeeze`.'), + + context={ + 'trits': trits, + 'offset': offset, + 'length': length, + }) + + # Ensure that ``trits`` can hold at least one hash worth of + # trits. + trits.extend([0] * max(0, length - len(trits))) + + # Check trits with offset can handle hash length + if len(trits) - offset < HASH_LENGTH: + raise with_context( + exc=ValueError('Invalid offset passed to ``squeeze``.'), + + context={ + 'trits': trits, + 'offset': offset, + 'length': length + }, + ) + + while length >= HASH_LENGTH: + # Copy exactly one hash. + trits[offset:offset + HASH_LENGTH] = self._state[0:HASH_LENGTH] + + # One hash worth of trits copied; now transform. + self._transform() + + offset += HASH_LENGTH + length -= HASH_LENGTH + + def _transform(self): + # type: () -> None + """ + Transforms internal state. + """ + # Copy some values locally so we can avoid global lookups in the + # inner loop. + # + # References: + # + # - https://wiki.python.org/moin/PythonSpeed/PerformanceTips#Local_Variables + state_length = STATE_LENGTH + truth_table = TRUTH_TABLE + + # Operate on a copy of ``self._state`` to eliminate dot lookups + # in the inner loop. + # + # References: + # + # - https://wiki.python.org/moin/PythonSpeed/PerformanceTips#Avoiding_dots... + # - http://stackoverflow.com/a/2612990/ + prev_state = self._state[:] + new_state = prev_state[:] + + # Note: This code looks significantly different from the C + # implementation because it has been optimized to limit the + # number of list item lookups (these are relatively slow in + # Python). + index = 0 + for _ in range(NUMBER_OF_ROUNDS): + prev_trit = prev_state[index] + + for pos in range(state_length): + index += (364 if index < 365 else -365) + + new_trit = prev_state[index] + + new_state[pos] = truth_table[prev_trit + (3 * new_trit) + 4] + + prev_trit = new_trit + + prev_state = new_state + new_state = new_state[:] + + self._state = new_state diff --git a/iota/crypto/signing.py b/iota/crypto/signing.py index d23548e..da01ad4 100644 --- a/iota/crypto/signing.py +++ b/iota/crypto/signing.py @@ -1,8 +1,8 @@ # coding=utf-8 from __future__ import absolute_import, division, print_function, \ - unicode_literals + unicode_literals -from typing import Iterator, List, MutableSequence, Sequence, Tuple +from typing import Iterator, List, Sequence from six import PY2 @@ -14,431 +14,445 @@ from iota.trits import add_trits, trits_from_int __all__ = [ - 'KeyGenerator', - 'SignatureFragmentGenerator', - 'validate_signature_fragments', + 'KeyGenerator', + 'SignatureFragmentGenerator', + 'validate_signature_fragments', ] def normalize(hash_): - # type: (Hash) -> List[List[int]] - """ - "Normalizes" a hash, converting it into a sequence of integers - (not trits!) suitable for use in signature generation/validation. - - The hash is divided up into 3 parts, each of which is "balanced" (sum - of all the values is equal to zero). - """ - normalized = [] - source = hash_.as_integers() + # type: (Hash) -> List[List[int]] + """ + "Normalizes" a hash, converting it into a sequence of integers + (not trits!) suitable for use in signature generation/validation. - chunk_size = 27 + The hash is divided up into 3 parts, each of which is "balanced" + (sum of all the values is equal to zero). + """ + normalized = [] + source = hash_.as_integers() - for i in range(Hash.LEN // chunk_size): - start = i * chunk_size - stop = start + chunk_size + chunk_size = 27 - chunk = source[start:stop] - chunk_sum = sum(chunk) + for i in range(Hash.LEN // chunk_size): + start = i * chunk_size + stop = start + chunk_size - while chunk_sum > 0: - chunk_sum -= 1 - for j in range(chunk_size): - if chunk[j] > -13: - chunk[j] -= 1 - break + chunk = source[start:stop] + chunk_sum = sum(chunk) + while chunk_sum > 0: + chunk_sum -= 1 + for j in range(chunk_size): + if chunk[j] > -13: + chunk[j] -= 1 + break - while chunk_sum < 0: - chunk_sum += 1 - for j in range(chunk_size): - if chunk[j] < 13: - chunk[j] += 1 - break + while chunk_sum < 0: + chunk_sum += 1 + for j in range(chunk_size): + if chunk[j] < 13: + chunk[j] += 1 + break - normalized.append(chunk) + normalized.append(chunk) - return normalized + return normalized class KeyGenerator(object): - """ - Generates signing keys for messages. - """ - def __init__(self, seed): - # type: (TrytesCompatible) -> None - super(KeyGenerator, self).__init__() + """ + Generates signing keys for messages. + """ + + def __init__(self, seed): + # type: (TrytesCompatible) -> None + super(KeyGenerator, self).__init__() + + self.seed = Seed(seed) + + def get_key(self, index, iterations): + # type: (int, int) -> PrivateKey + """ + Generates a single key. + + :param index: + The key index. + + :param iterations: + Number of transform iterations to apply to the key, also + known as security level. + Must be >= 1. + + Increasing this value makes key generation slower, but more + resistant to brute-forcing. + """ + return ( + self.get_keys( + start=index, + count=1, + step=1, + iterations=iterations, + )[0] + ) + + def get_key_for(self, address): + """ + Generates the key associated with the specified address. + + Note that this method will generate the wrong key if the input + address was generated from a different key! + """ + return self.get_key( + index=address.key_index, + iterations=address.security_level, + ) + + def get_keys(self, start, count=1, step=1, iterations=1): + # type: (int, int, int, int) -> List[PrivateKey] + """ + Generates and returns one or more keys at the specified + index(es). + + This is a one-time operation; if you want to create lots of keys + across multiple contexts, consider invoking + :py:meth:`create_iterator` and sharing the resulting generator + object instead. + + Warning: This method may take awhile to run if the starting + index and/or the number of requested keys is a large number! + + :param start: + Starting index. + Must be >= 0. + + :param count: + Number of keys to generate. + Must be > 0. + + :param step: + Number of indexes to advance after each key. + This may be any non-zero (positive or negative) integer. + + :param iterations: + Number of transform iterations to apply to each key, also + known as security level. + Must be >= 1. + + Increasing this value makes key generation slower, but more + resistant to brute-forcing. + + :return: + Always returns a list, even if only one key is generated. + + The returned list will contain ``count`` keys, except when + ``step * count < start`` (only applies when ``step`` is + negative). + """ + if count < 1: + raise with_context( + exc=ValueError('``count`` must be positive.'), + + context={ + 'start': start, + 'count': count, + 'step': step, + 'iterations': iterations, + }, + ) + + if not step: + raise with_context( + exc=ValueError('``step`` must not be zero.'), + + context={ + 'start': start, + 'count': count, + 'step': step, + 'iterations': iterations, + }, + ) + + iterator = self.create_iterator(start, step, iterations) + + keys = [] + for _ in range(count): + try: + next_key = next(iterator) + except StopIteration: + break + else: + keys.append(next_key) + + return keys + + def create_iterator(self, start=0, step=1, security_level=1): + # type: (int, int, int) -> KeyIterator + """ + Creates a generator that can be used to progressively generate + new keys. + + :param start: + Starting index. + + Warning: This method may take awhile to reset if ``start`` + is a large number! + + :param step: + Number of indexes to advance after each key. + + This value can be negative; the generator will exit if it + reaches an index < 0. + + Warning: The generator may take awhile to advance between + iterations if ``step`` is a large number! + + :param security_level: + Number of _transform iterations to apply to each key. + Must be >= 1. + + Increasing this value makes key generation slower, but more + resistant to brute-forcing. + """ + return KeyIterator(self.seed, start, step, security_level) - self.seed = Seed(seed) - def get_key(self, index, iterations): - # type: (int, int) -> PrivateKey +class KeyIterator(Iterator[PrivateKey]): + """ + Creates PrivateKeys from a set of iteration parameters. """ - Generates a single key. - :param index: - The key index. + def __init__(self, seed, start, step, security_level): + # type: (Seed, int, int, int) -> None + super(KeyIterator, self).__init__() + + if start < 0: + raise with_context( + exc=ValueError('``start`` cannot be negative.'), + + context={ + 'start': start, + 'step': step, + 'security_level': security_level, + }, + ) - :param iterations: - Number of transform iterations to apply to the key, also known - as security level. - Must be >= 1. + if security_level < 1: + raise with_context( + exc=ValueError('``security_level`` must be >= 1.'), - Increasing this value makes key generation slower, but more - resistant to brute-forcing. - """ - return self.get_keys(start=index, count=1, step=1, iterations=iterations)[0] + context={ + 'start': start, + 'step': step, + 'security_level': security_level, + }, + ) - def get_key_for(self, address): - """ - Generates the key associated with the specified address. + # In order to work correctly, the seed must be padded so that it + # is a multiple of 81 trytes. + seed += b'9' * (Hash.LEN - ((len(seed) % Hash.LEN) or Hash.LEN)) - Note that this method will generate the wrong key if the input - address was generated from a different key! - """ - return self.get_key( - index = address.key_index, - iterations = address.security_level, - ) + self.security_level = security_level + self.seed_as_trits = seed.as_trits() + self.start = start + self.step = step - def get_keys(self, start, count=1, step=1, iterations=1): - # type: (int, int, int, int) -> List[PrivateKey] - """ - Generates and returns one or more keys at the specified index(es). + self.current = self.start - This is a one-time operation; if you want to create lots of keys - across multiple contexts, consider invoking - :py:meth:`create_iterator` and sharing the resulting generator - object instead. + self.fragment_length = FRAGMENT_LENGTH * TRITS_PER_TRYTE + self.hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN - Warning: This method may take awhile to run if the starting index - and/or the number of requested keys is a large number! + def __iter__(self): + # type: () -> KeyIterator + return self - :param start: - Starting index. - Must be >= 0. + def __next__(self): + # type: () -> PrivateKey + while self.current >= 0: + sponge = self._create_sponge(self.current) - :param count: - Number of keys to generate. - Must be > 0. + key = [0] * (self.fragment_length * self.security_level) + buffer = [0] * len(self.seed_as_trits) - :param step: - Number of indexes to advance after each key. - This may be any non-zero (positive or negative) integer. + for fragment_seq in range(self.security_level): + # Squeeze trits from the buffer and append them to the + # key, one hash at a time. + for hash_seq in range(self.hashes_per_fragment): + sponge.squeeze(buffer) - :param iterations: - Number of transform iterations to apply to each key, also known - as security level. - Must be >= 1. + key_start = ( + (fragment_seq * self.fragment_length) + + (hash_seq * HASH_LENGTH) + ) - Increasing this value makes key generation slower, but more - resistant to brute-forcing. + key_stop = key_start + HASH_LENGTH - :return: - Always returns a list, even if only one key is generated. + # Ensure we only capture one hash from the buffer, + # in case it is longer than that (i.e., if the seed + # is longer than 81 trytes). + key[key_start:key_stop] = buffer[0:HASH_LENGTH] - The returned list will contain ``count`` keys, except when - ``step * count < start`` (only applies when ``step`` is - negative). - """ - if count < 1: - raise with_context( - exc = ValueError('``count`` must be positive.'), - - context = { - 'start': start, - 'count': count, - 'step': step, - 'iterations': iterations, - }, - ) - - if not step: - raise with_context( - exc = ValueError('``step`` must not be zero.'), - - context = { - 'start': start, - 'count': count, - 'step': step, - 'iterations': iterations, - }, - ) - - iterator = self.create_iterator(start, step, iterations) - - keys = [] - for _ in range(count): - try: - next_key = next(iterator) - except StopIteration: - break - else: - keys.append(next_key) - - return keys - - def create_iterator(self, start=0, step=1, security_level=1): - # type: (int, int, int) -> KeyIterator - """ - Creates a generator that can be used to progressively generate new - keys. + private_key = PrivateKey.from_trits( + key_index=self.current, + security_level=self.security_level, + trits=key, + ) - :param start: - Starting index. + self.advance() - Warning: This method may take awhile to reset if ``start`` - is a large number! + return private_key - :param step: - Number of indexes to advance after each key. + if PY2: + next = __next__ - This value can be negative; the generator will exit if it - reaches an index < 0. + def advance(self): + """ + Advances the generator without creating a key. + """ + self.current += self.step - Warning: The generator may take awhile to advance between - iterations if ``step`` is a large number! + def _create_sponge(self, index): + # type: (int) -> Kerl + """ + Prepares the hash sponge for the generator. + """ + seed = self.seed_as_trits[:] - :param security_level: - Number of _transform iterations to apply to each key. - Must be >= 1. + sponge = Kerl() + sponge.absorb(add_trits(seed, trits_from_int(index))) - Increasing this value makes key generation slower, but more - resistant to brute-forcing. - """ - return KeyIterator(self.seed, start, step, security_level) + # Squeeze all of the trits out of the sponge and re-absorb them. + # Note that the sponge transforms several times per operation, + # so this sequence is not as redundant as it looks at first + # glance. + sponge.squeeze(seed) + sponge.reset() + sponge.absorb(seed) + + return sponge -class KeyIterator(Iterator[PrivateKey]): - """ - Creates PrivateKeys from a set of iteration parameters. - """ - def __init__(self, seed, start, step, security_level): - # type: (Seed, int, int, int) -> None - super(KeyIterator, self).__init__() - - if start < 0: - raise with_context( - exc = ValueError('``start`` cannot be negative.'), - - context = { - 'start': start, - 'step': step, - 'security_level': security_level, - }, - ) - - if security_level < 1: - raise with_context( - exc = ValueError('``security_level`` must be >= 1.'), - - context = { - 'start': start, - 'step': step, - 'security_level': security_level, - }, - ) - - # In order to work correctly, the seed must be padded so that it is - # a multiple of 81 trytes. - seed += b'9' * (Hash.LEN - ((len(seed) % Hash.LEN) or Hash.LEN)) - - self.security_level = security_level - self.seed_as_trits = seed.as_trits() - self.start = start - self.step = step - - self.current = self.start - - self.fragment_length = FRAGMENT_LENGTH * TRITS_PER_TRYTE - self.hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN - - def __iter__(self): - # type: () -> KeyIterator - return self - - def __next__(self): - # type: () -> PrivateKey - while self.current >= 0: - sponge = self._create_sponge(self.current) - - key = [0] * (self.fragment_length * self.security_level) - buffer = [0] * len(self.seed_as_trits) - - for fragment_seq in range(self.security_level): - # Squeeze trits from the buffer and append them to the key, one - # hash at a time. - for hash_seq in range(self.hashes_per_fragment): - sponge.squeeze(buffer) - - key_start =\ - (fragment_seq * self.fragment_length) + (hash_seq * HASH_LENGTH) - - key_stop = key_start + HASH_LENGTH - - # Ensure we only capture one hash from the buffer, in case - # it is longer than that (i.e., if the seed is longer than 81 - # trytes). - key[key_start:key_stop] = buffer[0:HASH_LENGTH] - - private_key =\ - PrivateKey.from_trits( - key_index = self.current, - security_level = self.security_level, - trits = key, - ) # type: PrivateKey - - self.advance() - - return private_key - - if PY2: - next = __next__ - - def advance(self): - """ - Advances the generator without creating a key. +class SignatureFragmentGenerator(Iterator[TryteString]): """ - self.current += self.step + Used to generate signature fragments progressively. - def _create_sponge(self, index): - # type: (int) -> Kerl + Each instance can generate 1 signature per fragment in the private + key. """ - Prepares the hash sponge for the generator. - """ - seed = self.seed_as_trits[:] - sponge = Kerl() - sponge.absorb(add_trits(seed, trits_from_int(index))) + def __init__(self, private_key, hash_): + # type: (PrivateKey, Hash) -> None + super(SignatureFragmentGenerator, self).__init__() - # Squeeze all of the trits out of the sponge and re-absorb them. - # Note that the sponge transforms several times per operation, so - # this sequence is not as redundant as it looks at first glance. - sponge.squeeze(seed) - sponge.reset() - sponge.absorb(seed) + self._key_chunks = private_key.iter_chunks(FRAGMENT_LENGTH) + self._iteration = -1 + self._normalized_hash = normalize(hash_) + self._sponge = Kerl() - return sponge + def __iter__(self): + # type: () -> SignatureFragmentGenerator + return self + def __len__(self): + # type: () -> int + """ + Returns the number of fragments this generator can create. -class SignatureFragmentGenerator(Iterator[TryteString]): - """ - Used to generate signature fragments progressively. - - Each instance can generate 1 signature per fragment in the private - key. - """ - def __init__(self, private_key, hash_): - # type: (PrivateKey, Hash) -> None - super(SignatureFragmentGenerator, self).__init__() - - self._key_chunks = private_key.iter_chunks(FRAGMENT_LENGTH) - self._iteration = -1 - self._normalized_hash = normalize(hash_) - self._sponge = Kerl() - - def __iter__(self): - # type: () -> SignatureFragmentGenerator - return self - - def __len__(self): - # type: () -> int - """ - Returns the number of fragments this generator can create. + Note: This method always returns the same result, no matter how + many iterations have been completed. + """ + return len(self._key_chunks) - Note: This method always returns the same result, no matter how - many iterations have been completed. - """ - return len(self._key_chunks) - - def __next__(self): - # type: () -> TryteString - """ - Returns the next signature fragment. - """ - key_trytes = next(self._key_chunks) # type: TryteString - self._iteration += 1 + def __next__(self): + # type: () -> TryteString + """ + Returns the next signature fragment. + """ + key_trytes = next(self._key_chunks) # type: TryteString + self._iteration += 1 - # If the key is long enough, loop back around to the start. - normalized_chunk =\ - self._normalized_hash[self._iteration % len(self._normalized_hash)] + # If the key is long enough, loop back around to the start. + normalized_chunk = ( + self._normalized_hash[self._iteration % len(self._normalized_hash)] + ) - signature_fragment = key_trytes.as_trits() + signature_fragment = key_trytes.as_trits() - # Build the signature, one hash at a time. - for i in range(key_trytes.count_chunks(Hash.LEN)): - hash_start = i * HASH_LENGTH - hash_end = hash_start + HASH_LENGTH + # Build the signature, one hash at a time. + for i in range(key_trytes.count_chunks(Hash.LEN)): + hash_start = i * HASH_LENGTH + hash_end = hash_start + HASH_LENGTH - buffer = signature_fragment[hash_start:hash_end] # type: MutableSequence[int] + buffer = signature_fragment[hash_start:hash_end] # type: List[int] - for _ in range(13 - normalized_chunk[i]): - self._sponge.reset() - self._sponge.absorb(buffer) - self._sponge.squeeze(buffer) + for _ in range(13 - normalized_chunk[i]): + self._sponge.reset() + self._sponge.absorb(buffer) + self._sponge.squeeze(buffer) - signature_fragment[hash_start:hash_end] = buffer + signature_fragment[hash_start:hash_end] = buffer - return TryteString.from_trits(signature_fragment) + return TryteString.from_trits(signature_fragment) - if PY2: - next = __next__ + if PY2: + next = __next__ def validate_signature_fragments( - fragments, - hash_, - public_key, - sponge_type = Kerl, + fragments, + hash_, + public_key, + sponge_type=Kerl, ): - # type: (Sequence[TryteString], Hash, TryteString, type) -> bool - """ - Returns whether a sequence of signature fragments is valid. + # type: (Sequence[TryteString], Hash, TryteString, type) -> bool + """ + Returns whether a sequence of signature fragments is valid. - :param fragments: - Sequence of signature fragments (usually - :py:class:`iota.transaction.Fragment` instances). + :param fragments: + Sequence of signature fragments (usually + :py:class:`iota.transaction.Fragment` instances). - :param hash_: - Hash used to generate the signature fragments (usually a - :py:class:`iota.transaction.BundleHash` instance). + :param hash_: + Hash used to generate the signature fragments (usually a + :py:class:`iota.transaction.BundleHash` instance). - :param public_key: - The public key value used to verify the signature digest (usually a - :py:class:`iota.types.Address` instance). + :param public_key: + The public key value used to verify the signature digest (usually a + :py:class:`iota.types.Address` instance). - :param sponge_type: - The class used to create the cryptographic sponge (i.e., Curl or Kerl). - """ - checksum = [0] * (HASH_LENGTH * len(fragments)) - normalized_hash = normalize(hash_) + :param sponge_type: + The class used to create the cryptographic sponge (i.e., Curl or Kerl). + """ + checksum = [0] * (HASH_LENGTH * len(fragments)) + normalized_hash = normalize(hash_) - for (i, fragment) in enumerate(fragments): # type: Tuple[int, TryteString] - outer_sponge = sponge_type() + for i, fragment in enumerate(fragments): + outer_sponge = sponge_type() - # If there are more than 3 iterations, loop back around to the - # start. - normalized_chunk = normalized_hash[i % len(normalized_hash)] + # If there are more than 3 iterations, loop back around to the + # start. + normalized_chunk = normalized_hash[i % len(normalized_hash)] - buffer = [] - for (j, hash_trytes) in enumerate(fragment.iter_chunks(Hash.LEN)): # type: Tuple[int, TryteString] - buffer = hash_trytes.as_trits() # type: MutableSequence[int] - inner_sponge = sponge_type() + buffer = [] + for j, hash_trytes in enumerate(fragment.iter_chunks(Hash.LEN)): + buffer = hash_trytes.as_trits() # type: List[int] + inner_sponge = sponge_type() - # Note the sign flip compared to ``SignatureFragmentGenerator``. - for _ in range(13 + normalized_chunk[j]): - inner_sponge.reset() - inner_sponge.absorb(buffer) - inner_sponge.squeeze(buffer) + # Note the sign flip compared to + # :py;class:`SignatureFragmentGenerator`. + for _ in range(13 + normalized_chunk[j]): + inner_sponge.reset() + inner_sponge.absorb(buffer) + inner_sponge.squeeze(buffer) - outer_sponge.absorb(buffer) + outer_sponge.absorb(buffer) - outer_sponge.squeeze(buffer) - checksum[i*HASH_LENGTH:(i+1)*HASH_LENGTH] = buffer + outer_sponge.squeeze(buffer) + checksum[i * HASH_LENGTH:(i + 1) * HASH_LENGTH] = buffer - actual_public_key = [0] * HASH_LENGTH # type: MutableSequence[int] - addy_sponge = sponge_type() - addy_sponge.absorb(checksum) - addy_sponge.squeeze(actual_public_key) + actual_public_key = [0] * HASH_LENGTH + addy_sponge = sponge_type() + addy_sponge.absorb(checksum) + addy_sponge.squeeze(actual_public_key) - return actual_public_key == public_key.as_trits() + return actual_public_key == public_key.as_trits() diff --git a/iota/crypto/types.py b/iota/crypto/types.py index 0bf33c4..6b2664b 100644 --- a/iota/crypto/types.py +++ b/iota/crypto/types.py @@ -1,9 +1,9 @@ # coding=utf-8 from __future__ import absolute_import, division, print_function, \ - unicode_literals + unicode_literals import warnings -from typing import MutableSequence, Optional, Tuple +from typing import Optional from iota.crypto import FRAGMENT_LENGTH, HASH_LENGTH, SeedWarning from iota.crypto.kerl import Kerl @@ -12,274 +12,285 @@ from iota.types import Hash, TryteString, TrytesCompatible __all__ = [ - 'Digest', - 'PrivateKey', - 'Seed', + 'Digest', + 'PrivateKey', + 'Seed', ] class Digest(TryteString): - """ - A private key digest. Basically the same thing as a regular - :py:class:`TryteString`, except that it (usually) has a key index - associated with it. - - Note: in a few cases (e.g., generating multisig addresses), a key - index is not necessary/available. - """ - def __init__(self, trytes, key_index=None): - # type: (TrytesCompatible, Optional[int]) -> None - super(Digest, self).__init__(trytes) - - # A digest is a series of hashes; its length should reflect that. - if len(self) % Hash.LEN: - raise with_context( - exc = ValueError( - 'Length of {cls} values must be a multiple of {len} trytes.'.format( - cls = type(self).__name__, - len = Hash.LEN, - ), - ), - - context = { - 'trytes': trytes, - }, - ) - - self.key_index = key_index - - @property - def security_level(self): - # type: () -> int """ - Returns the number of iterations that were used to generate this - digest (also known as "security level"). + A private key digest. Basically the same thing as a regular + :py:class:`TryteString`, except that it (usually) has a key index + associated with it. + + Note: in a few cases (e.g., generating multisig addresses), a key + index is not necessary/available. """ - return len(self) // Hash.LEN - def as_json_compatible(self): - # type: () -> dict - return { - 'trytes': self._trytes.decode('ascii'), - 'key_index': self.key_index, - } + def __init__(self, trytes, key_index=None): + # type: (TrytesCompatible, Optional[int]) -> None + super(Digest, self).__init__(trytes) + + # A digest is a series of hashes; its length should reflect + # that. + if len(self) % Hash.LEN: + raise with_context( + exc=ValueError( + 'Length of {cls} values ' + 'must be a multiple of {len} trytes.'.format( + cls=type(self).__name__, + len=Hash.LEN, + ), + ), + + context={ + 'trytes': trytes, + }, + ) + + self.key_index = key_index + + @property + def security_level(self): + # type: () -> int + """ + Returns the number of iterations that were used to generate this + digest (also known as "security level"). + """ + return len(self) // Hash.LEN + + def as_json_compatible(self): + # type: () -> dict + return { + 'trytes': self._trytes.decode('ascii'), + 'key_index': self.key_index, + } class Seed(TryteString): - """ - A TryteString that acts as a seed for crypto functions. - - Note: This class is identical to :py:class:`TryteString`, but it has - a distinct type so that seeds can be identified in Python code. - - IMPORTANT: For maximum security, a seed must be EXACTLY 81 trytes! - - WARNINGS: - .. warning:: :py:class:`SeedWarning` if seed has inappropriate length. - - References: - - https://forum.iota.org/t/why-arent-seeds-longer-than-81-trytes-more-secure/1278 - """ - - def __init__(self, trytes=None): - # type: (Optional[TrytesCompatible]) -> None - if trytes and len(trytes) > Hash.LEN: - warnings.warn("Seed has inappropriate length! " - "(https://forum.iota.org/t/why-arent-seeds-longer-than-81-trytes-more-secure/1278)", - category=SeedWarning) - - super(Seed, self).__init__(trytes) - - @classmethod - def random(cls, length=Hash.LEN): - """ - Generates a random seed using a CSPRNG. - - :param length: - Length of seed, in trytes. - For maximum security, this should always be set to 81. - - References: - - https://forum.iota.org/t/why-arent-seeds-longer-than-81-trytes-more-secure/1278 """ - return super(Seed, cls).random(length) + A TryteString that acts as a seed for crypto functions. + Note: This class is identical to :py:class:`TryteString`, but it has + a distinct type so that seeds can be identified in Python code. -class PrivateKey(TryteString): - """ - A TryteString that acts as a private key, e.g., for generating - message signatures, new addresses, etc. - """ - def __init__(self, trytes, key_index=None, security_level=None): - # type: (TrytesCompatible, Optional[int], Optional[int]) -> None - super(PrivateKey, self).__init__(trytes) - - if len(self._trytes) % FRAGMENT_LENGTH: - raise with_context( - exc = ValueError( - 'Length of {cls} values must be a multiple of {len} trytes.'.format( - cls = type(self).__name__, - len = FRAGMENT_LENGTH, - ), - ), - - context = { - 'trytes': self._trytes, - }, - ) - - self.key_index = key_index - self.security_level = security_level - - def as_json_compatible(self): - # type: () -> dict - return { - 'trytes': self._trytes.decode('ascii'), - 'key_index': self.key_index, - 'security_level': self.security_level, - } - - def get_digest(self): - # type: () -> Digest - """ - Generates the digest used to do the actual signing. + IMPORTANT: For maximum security, a seed must be EXACTLY 81 trytes! - Signing keys can have variable length and tend to be quite long, - which makes them not-well-suited for use in crypto algorithms. + References: - The digest is essentially the result of running the signing key - through a PBKDF, yielding a constant-length hash that can be used - for crypto. + - https://iota.stackexchange.com/q/249 """ - hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN - - key_fragments = self.iter_chunks(FRAGMENT_LENGTH) - - # The digest will contain one hash per key fragment. - digest = [0] * HASH_LENGTH * len(key_fragments) - # Iterate over each fragment in the key. - for (i, fragment) in enumerate(key_fragments): # type: Tuple[int, TryteString] - fragment_trits = fragment.as_trits() + def __init__(self, trytes=None): + # type: (Optional[TrytesCompatible]) -> None + if trytes and len(trytes) > Hash.LEN: + warnings.warn( + message=( + "Seed has inappropriate length! " + "(see https://iota.stackexchange.com/q/249 for more info)." + ), - key_fragment = [0] * FRAGMENT_LENGTH - hash_trits = [] + category=SeedWarning, + ) - # Within each fragment, iterate over one hash at a time. - for j in range(hashes_per_fragment): - hash_start = j * HASH_LENGTH - hash_end = hash_start + HASH_LENGTH - hash_trits = fragment_trits[hash_start:hash_end] # type: MutableSequence[int] + super(Seed, self).__init__(trytes) - for k in range(26): - sponge = Kerl() - sponge.absorb(hash_trits) - sponge.squeeze(hash_trits) + @classmethod + def random(cls, length=Hash.LEN): + """ + Generates a random seed using a CSPRNG. - key_fragment[hash_start:hash_end] = hash_trits + :param length: + Length of seed, in trytes. - # - # After processing all of the hashes in the fragment, generate a - # final hash and append it to the digest. - # - # Note that we will do this once per fragment in the key, so the - # longer the key is, the longer the digest will be. - # - sponge = Kerl() - sponge.absorb(key_fragment) - sponge.squeeze(hash_trits) + For maximum security, this should always be set to 81, but + you can change it if you're 110% sure you know what you're + doing. - fragment_hash_start = i * HASH_LENGTH - fragment_hash_end = fragment_hash_start + HASH_LENGTH + See https://iota.stackexchange.com/q/249 for more info. + """ + return super(Seed, cls).random(length) - digest[fragment_hash_start:fragment_hash_end] = hash_trits - return Digest(TryteString.from_trits(digest), self.key_index) - - def sign_input_transactions(self, bundle, start_index): - # type: (Bundle, int) -> None +class PrivateKey(TryteString): """ - Signs the inputs starting at the specified index. - - :param bundle: - The bundle that contains the input transactions to sign. - - :param start_index: - The index of the first input transaction. - - If necessary, the resulting signature will be split across - subsequent transactions automatically. + A TryteString that acts as a private key, e.g., for generating + message signatures, new addresses, etc. """ - if not bundle.hash: - raise with_context( - exc = ValueError('Cannot sign inputs without a bundle hash!'), - - context = { - 'bundle': bundle, - 'key_index': self.key_index, - 'start_index': start_index, - }, - ) - - from iota.crypto.signing import SignatureFragmentGenerator - signature_fragment_generator = SignatureFragmentGenerator(self, bundle.hash) - - # We can only fit one signature fragment into each transaction, - # so we have to split the entire signature. - for j in range(self.security_level): - # Do lots of validation before we attempt to sign the - # transaction, and attach lots of context info to any exception. - # This method is likely to be invoked at a very low level in the - # application, so if anything goes wrong, we want to make sure - # it's as easy to troubleshoot as possible! - try: - txn = bundle[start_index+j] - except IndexError as e: - raise with_context( - exc = e, - - context = { - 'bundle': bundle, - 'key_index': self.key_index, - 'current_index': start_index + j, - }, - ) - - # Only inputs can be signed. - if txn.value > 0: - raise with_context( - exc = - ValueError( - 'Attempting to sign non-input transaction #{i} ' - '(value={value}).'.format( - i = txn.current_index, - value = txn.value, - ), - ), - - context = { - 'bundle': bundle, - 'key_index': self.key_index, - 'start_index': start_index, - }, - ) - - if txn.signature_message_fragment: - raise with_context( - exc = - ValueError( - 'Attempting to sign input transaction #{i}, ' - 'but it has a non-empty fragment (is it already signed?).'.format( - i = txn.current_index, - ), - ), - - context = { - 'bundle': bundle, - 'key_index': self.key_index, - 'start_index': start_index, - }, + def __init__(self, trytes, key_index=None, security_level=None): + # type: (TrytesCompatible, Optional[int], Optional[int]) -> None + super(PrivateKey, self).__init__(trytes) + + if len(self._trytes) % FRAGMENT_LENGTH: + raise with_context( + exc=ValueError( + 'Length of {cls} values ' + 'must be a multiple of {len} trytes.'.format( + cls=type(self).__name__, + len=FRAGMENT_LENGTH, + ), + ), + + context={ + 'trytes': self._trytes, + }, + ) + + self.key_index = key_index + self.security_level = security_level + + def as_json_compatible(self): + # type: () -> dict + return { + 'trytes': self._trytes.decode('ascii'), + 'key_index': self.key_index, + 'security_level': self.security_level, + } + + def get_digest(self): + # type: () -> Digest + """ + Generates the digest used to do the actual signing. + + Signing keys can have variable length and tend to be quite long, + which makes them not-well-suited for use in crypto algorithms. + + The digest is essentially the result of running the signing key + through a PBKDF, yielding a constant-length hash that can be + used for crypto. + """ + hashes_per_fragment = FRAGMENT_LENGTH // Hash.LEN + + key_fragments = self.iter_chunks(FRAGMENT_LENGTH) + + # The digest will contain one hash per key fragment. + digest = [0] * HASH_LENGTH * len(key_fragments) + + # Iterate over each fragment in the key. + for i, fragment in enumerate(key_fragments): + fragment_trits = fragment.as_trits() + + key_fragment = [0] * FRAGMENT_LENGTH + hash_trits = [] + + # Within each fragment, iterate over one hash at a time. + for j in range(hashes_per_fragment): + hash_start = j * HASH_LENGTH + hash_end = hash_start + HASH_LENGTH + hash_trits = fragment_trits[hash_start:hash_end] + + for k in range(26): + sponge = Kerl() + sponge.absorb(hash_trits) + sponge.squeeze(hash_trits) + + key_fragment[hash_start:hash_end] = hash_trits + + # After processing all of the hashes in the fragment, + # generate a final hash and append it to the digest. + # + # Note that we will do this once per fragment in the key, so + # the longer the key is, the longer the digest will be. + sponge = Kerl() + sponge.absorb(key_fragment) + sponge.squeeze(hash_trits) + + fragment_hash_start = i * HASH_LENGTH + fragment_hash_end = fragment_hash_start + HASH_LENGTH + + digest[fragment_hash_start:fragment_hash_end] = hash_trits + + return Digest(TryteString.from_trits(digest), self.key_index) + + def sign_input_transactions(self, bundle, start_index): + # type: (Bundle, int) -> None + """ + Signs the inputs starting at the specified index. + + :param bundle: + The bundle that contains the input transactions to sign. + + :param start_index: + The index of the first input transaction. + + If necessary, the resulting signature will be split across + subsequent transactions automatically. + """ + + if not bundle.hash: + raise with_context( + exc=ValueError('Cannot sign inputs without a bundle hash!'), + + context={ + 'bundle': bundle, + 'key_index': self.key_index, + 'start_index': start_index, + }, + ) + + from iota.crypto.signing import SignatureFragmentGenerator + signature_fragment_generator = ( + SignatureFragmentGenerator(self, bundle.hash) ) - txn.signature_message_fragment = next(signature_fragment_generator) + # We can only fit one signature fragment into each transaction, + # so we have to split the entire signature. + for j in range(self.security_level): + # Do lots of validation before we attempt to sign the + # transaction, and attach lots of context info to any + # exception. + # + # This method is likely to be invoked at a very low level in + # the application, so if anything goes wrong, we want to + # make sure it's as easy to troubleshoot as possible! + try: + txn = bundle[start_index + j] + except IndexError as e: + raise with_context( + exc=e, + + context={ + 'bundle': bundle, + 'key_index': self.key_index, + 'current_index': start_index + j, + }, + ) + + # Only inputs can be signed. + if txn.value > 0: + raise with_context( + exc=ValueError( + 'Attempting to sign non-input transaction #{i} ' + '(value={value}).'.format( + i=txn.current_index, + value=txn.value, + ), + ), + + context={ + 'bundle': bundle, + 'key_index': self.key_index, + 'start_index': start_index, + }, + ) + + if txn.signature_message_fragment: + raise with_context( + exc=ValueError( + 'Attempting to sign input transaction #{i}, ' + 'but it has a non-empty fragment ' + '(is it already signed?).'.format( + i=txn.current_index, + ), + ), + + context={ + 'bundle': bundle, + 'key_index': self.key_index, + 'start_index': start_index, + }, + ) + + txn.signature_message_fragment = next(signature_fragment_generator)