diff --git a/src/hades/agent/signature.py b/src/hades/agent/signature.py index fd25f047..15a55287 100644 --- a/src/hades/agent/signature.py +++ b/src/hades/agent/signature.py @@ -1,132 +1,206 @@ # -*- coding: utf-8 -*- """ -A serializer that prepends a JSON header with an ed25519 signature before the -data. - -The header is a completely separate JSON document, that is separated from the -body by optional whitespace. This allows parsing the header without parsing the -payload. Also it is not required to encode the payload. Raw data can also be -inspected by humans easily. The only problem with this scheme is, that the -payload may not start with space, because this white space would be striped -and signature verification would fail. - -The JSON payload ``{"Foobar": 1}`` would be encoded as follows: - -.. code-block :: json - - { - "signature": "u9rPWbQh3TNpW8wrimL5SHtelkgm32cTPfzaUgp+djMDGz/Vjf/mb6BtQcXpJ1noJl2xILTWxhrpqtv9ykf2Bw==", - "signer": "w7ADgLSZTlXIDY/qbcfxUCeXht8VcpGoJYOj0lQu1Qw=", - "content_type": "application/json", - "content_encoding": "utf-8" - } - { - "Foobar": 1 - } +A serializer that encodes data using another inner serializer and signs the +encoded binary data with an ed25519 signature. A single inner serializer can be +specified for serialization, but multiple inner serializers can be configured +for deserialization. + +The binary data encoded by the inner serializer is prepended with the signature, +the public key that produced the signature message, the content type and the +content encoding of the inner serializer. The content type and the content +encoding are delimited by a null byte (\x00). + +The output of the inner serializer is left as is as payload without any +additional encoding. + +.. code-block :: text + + +-----------------------------+----+ <--+ + | Signature (64 bytes) | | + | | H + +--------------------------------- + e <--+ + | Signer (32 bytes) | a M + +--------------------------------- + d e + | Content Type (variable) | 00 | e s + +-----------------------------+----+ r s + | Content Encoding (variable) | 00 | | a + +-----------------------------+----+ <--+ g + | Payload (variable) | e + +----------------------------------+ <--+ + +The signature is computed over everything (including signer, content type, +content encoding). + +The header may be armored by encoding it using base 64 (without newlines) and +placing newline (\n) between header and payload: + +.. code-block :: text + + +-----------------------------+----+ + | Header (variable, base 64) | \n | + +--------------------------------- + + | Payload (variable) | + +----------------------------------+ """ -import io -import json -from typing import Iterable, Union +import base64 +from functools import partial +from typing import Iterable, Optional, Union +import nacl.bindings import nacl.encoding import nacl.signing -from kombu.serialization import dumps, loads, register as kombu_register -from kombu.utils.encoding import bytes_to_str - -__all__ = ['ED25519Serializer', 'register'] - - -class ED25519Serializer(object): - key_codec = nacl.encoding.Base64Encoder() - json_decoder = json.JSONDecoder() - whitespace = b' \t\n\r' - - def __init__(self, signing_key: nacl.signing.SigningKey, - verify_keys: Iterable[nacl.signing.VerifyKey], - serializer='json', content_encoding='utf-8'): - self._signing_key = signing_key +from kombu.serialization import dumps, loads, registry + +__all__ = ("ED25519Serializer", "register", "register_armored", "register_raw") +# Extracts signer from a raw signed message +from hades.common.util import qualified_name + +SIGNER_SLICE = slice( + nacl.bindings.crypto_sign_BYTES, + nacl.bindings.crypto_sign_BYTES + nacl.bindings.crypto_sign_PUBLICKEYBYTES +) +# Extracts message from a signed message +MESSAGE_SLICE = slice( + nacl.bindings.crypto_sign_BYTES + nacl.bindings.crypto_sign_PUBLICKEYBYTES, + None +) + + +class ED25519Serializer: + def __init__( + self, signing_key: nacl.signing.SigningKey, + verify_keys: Iterable[nacl.signing.VerifyKey], + inner_serializer: str = "json", + accept: Optional[Iterable] = None, + armored: bool = False, + ): + # Create copies of keys with raw encoder + self._signing_key = nacl.signing.SigningKey(bytes(signing_key)) + self._signer = bytes(signing_key.verify_key) self._verify_keys = { - self.key_codec.encode(bytes(key)).decode('ascii'): key + bytes(key): nacl.signing.VerifyKey(bytes(key)) for key in verify_keys } - self._serializer = serializer - self._signer = self.key_codec.encode( - bytes(self._signing_key.verify_key)).decode('ascii') - self._content_encoding = content_encoding + self._inner_serializer = inner_serializer + self._accept = frozenset(accept) if accept else None + self._armored = armored - def _ensure_bytes(self, data: Union[bytes, str]): + @staticmethod + def _ensure_bytes(data: Union[bytes, str], content_encoding: str) -> bytes: if isinstance(data, bytes): return data - return data.encode(self._content_encoding) + if isinstance(data, bytearray): + return bytes(data) + elif isinstance(data, str): + return data.encode(content_encoding) + elif isinstance(data, memoryview): + return data.tobytes() + else: + raise TypeError( + f"Argument should be bytes, bytearray, str, or memoryview, " + f"not {qualified_name(type(data))}" + ) def serialize(self, data): - content_type, content_encoding, body = dumps( - data, serializer=self._serializer) - if content_encoding != self._content_encoding: - raise ValueError("Content encoding of inner serializer {!r} must " - "match ({!r} != {!r})" - .format(self._serializer, content_encoding, - self._content_encoding)) - body = self._ensure_bytes(body) - if len(body) > 0 and body[0] in self.whitespace: - raise ValueError("Inner data may not begin with the following " - "characters {!r}" - .format(str(self.whitespace))) - message = self._signing_key.sign(body) - signature = self.key_codec.encode(message.signature).decode('ascii') - header = { - 'signature': signature, - 'signer': self._signer, - 'content_type': content_type, - 'content_encoding': content_encoding, - } - buffer = io.BytesIO() - wrapper = io.TextIOWrapper(buffer, self._content_encoding, - write_through=True) - with wrapper: - json.dump(header, wrapper) - buffer.write(b"\n") - buffer.write(message.message) - return buffer.getvalue() - - def parse_header(self, data): - return self.json_decoder.raw_decode(data.decode(self._content_encoding)) - - def deserialize(self, data): - data = self._ensure_bytes(data) - header, end = self.parse_header(data) - # Skip whitespace - length = len(data) - while end < length and data[end] in self.whitespace: - end += 1 - header, body = header, data[end:] - - signer, signature, content_type, content_encoding = ( - header['signer'], header['signature'], - header['content_type'], header['content_encoding'] + content_type, content_encoding, payload = dumps( + data, serializer=self._inner_serializer ) - signature = self.key_codec.decode(signature) - if content_encoding != self._content_encoding: - raise ValueError("Invalid inner content encoding ({!r} != {!r})" - .format(content_encoding, self._content_encoding)) + payload = self._ensure_bytes(payload, content_encoding) + content_type = self._ensure_bytes(content_type, "us-ascii") + content_encoding = self._ensure_bytes(content_encoding, "us-ascii") + message = b"".join(( + self._signer, + content_type, + b"\x00", + content_encoding, + b"\x00", + payload, + )) + signed_message = self._signing_key.sign(message) + if self._armored: + return base64.b64encode( + signed_message[:-len(payload)] + ) + b"\n" + payload + else: + return signed_message + + def deserialize(self, data: bytes): + if self._armored: + header, sep, payload = data.partition(b"\n") + if not sep: + raise ValueError("Invalid message: Expected armored header") + header = base64.b64decode(header, validate=True) + data = header + payload + signer = data[SIGNER_SLICE] try: verify_key = self._verify_keys[signer] except KeyError: raise ValueError("Unknown signer {!r}".format(signer)) from None - verify_key.verify(body, signature) - return loads(bytes_to_str(body), content_type, content_encoding, - force=True) - - -def register(signing_key: nacl.signing.SigningKey, - verify_keys: Iterable[nacl.signing.VerifyKey], - name: str = 'ed25519', serializer='json', - content_type: str = 'application/x-data-ed25519', - content_encoding: str = 'utf-8'): - """Register serializer with :mod:`kombu`""" - s = ED25519Serializer(signing_key, verify_keys, serializer, - content_encoding) - kombu_register(name, s.serialize, s.deserialize, content_type, - content_encoding) + message = verify_key.verify(data) + # Skip signer + message = message[nacl.bindings.crypto_sign_PUBLICKEYBYTES:] + + try: + content_type, content_encoding, payload = message.split( + b"\x00", maxsplit=2 + ) + except ValueError: + raise ValueError( + "Invalid message: No content_type/content_encoding before " + "payload" + ) from None + content_type = content_type.decode("us-ascii") + content_encoding = content_encoding.decode("us-ascii") + return loads( + payload, content_type, content_encoding, + accept=self._accept, + ) + + +def register( + signing_key: nacl.signing.SigningKey, + verify_keys: Iterable[nacl.signing.VerifyKey], + name: str, + content_type: str, + armored: bool, + *, + inner_serializer: str = "json", + accept: Optional[Iterable[str]] = ("application/json",), +): + """ + Register serializer with :mod:`kombu`. + + :param signing_key: The key used for signing serialized messages + :param verify_keys: The keys used for verifying messages before + deserialization + :param inner_serializer: The serializer to use for serializing the data + before signing + :param accept: If not :obj:`None` an iterable of content types that may be + decoded + :param name: Name of the serializer + :param content_type: Content type of the serializer + :param armored: Specifies whether the leading signature header should be + base64 encoded + """ + s = ED25519Serializer( + signing_key, verify_keys, inner_serializer, accept, armored, + ) + registry.register( + name, s.serialize, s.deserialize, content_type, "binary", + ) + + +register_raw = partial( + register, + name="ed25519", + content_type="application/x.data.ed25519", + armored=False, +) +register_armored = partial( + register, + name="ed25519.armored", + content_type="application/x.data.ed25519.armored", + armored=True, +) diff --git a/tests/test_signature.py b/tests/test_signature.py new file mode 100644 index 00000000..1a091fb8 --- /dev/null +++ b/tests/test_signature.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- +import operator +from functools import partial +from typing import Any, Callable, Iterable, Optional, Tuple, Union + +from kombu.exceptions import ContentDisallowed, SerializerNotInstalled +from kombu.serialization import registry +import nacl.bindings +import nacl.encoding +import nacl.exceptions +import nacl.signing +import pytest + +from hades.agent.signature import ED25519Serializer, register_armored, register_raw + +dual_serializer_factory = Callable[ + [str, Optional[Iterable[str]], bool], + Tuple[ED25519Serializer, ED25519Serializer] +] + +single_serializer_factory = Callable[ + [str, Optional[Iterable[str]], bool], + ED25519Serializer +] + + +@pytest.fixture(scope='session') +def alice_key() -> nacl.signing.SigningKey: + return nacl.signing.SigningKey.generate() + + +@pytest.fixture(scope='session') +def bob_key() -> nacl.signing.SigningKey: + return nacl.signing.SigningKey.generate() + + +@pytest.fixture(scope='session') +def carol_key() -> nacl.signing.SigningKey: + return nacl.signing.SigningKey.generate() + + +@pytest.fixture(scope='session') +def make_dual_serializers(alice_key, bob_key) -> dual_serializer_factory: + def _make_serializers( + inner_serializer: str, accept: Optional[Iterable[str]] = None, armored: bool = False, + ) -> Tuple[ED25519Serializer, ED25519Serializer]: + """Create ed25519 serializers with a given inner serializer with mutual trust between Alice and Bob""" + alice_serializer = ED25519Serializer(alice_key, [bob_key.verify_key], inner_serializer, accept, armored) + bob_serializer = ED25519Serializer(bob_key, [alice_key.verify_key], inner_serializer, accept, armored) + return alice_serializer, bob_serializer + + return _make_serializers + + +@pytest.fixture(scope='session') +def make_single_serializer(carol_key) -> single_serializer_factory: + def _make_serializer( + serializer: str, accept: Optional[Iterable[str]] = None, armored: bool = False, + ) -> ED25519Serializer: + return ED25519Serializer(carol_key, [carol_key.verify_key], serializer, accept, armored) + return _make_serializer + + +@pytest.fixture(params={ + 'raw': [ + 'secret string message', + b'secret bytes message' + ], + 'json': [ + 'test', + 1, + 3.14, + True, + False, + None, + { + 'string': 'test', + 'int': 1, + 'float': 3.14, + 'True': True, + 'False': True, + 'None': None, + 'list': ['test', 1, 3.14, True, False, None], + 'dict': { + 'de': 'Heizölrückstoßabdämpfung', + 'hu': 'Árvíztűrő tükörfúrógép', + 'el': 'Ξεσκεπάζω τὴν ψυχοφθόρα βδελυγμία', + }, + }, + ] +}.items(), ids=operator.itemgetter(0)) +def test_values(request) -> Tuple[str, Any]: + return request.param + + +@pytest.mark.parametrize( + 'armored', + (True, False), + ids=('raw', 'armored'), +) +def test_serdes_with_dual_keys( + test_values: Tuple[str, Any], + make_dual_serializers: dual_serializer_factory, + armored: bool, +): + inner_serializer, values = test_values + alice_serializer, bob_serializer = make_dual_serializers(inner_serializer, armored=armored) + for value in values: + message = alice_serializer.serialize(value) + assert value == bob_serializer.deserialize(message) + + +@pytest.mark.parametrize( + 'armored', + (True, False), + ids=('raw', 'armored'), +) +def test_serdes_with_single_key( + test_values: Tuple[str, Any], + make_single_serializer: single_serializer_factory, + armored: bool, +): + inner_serializer, values = test_values + serializer = make_single_serializer(inner_serializer, armored=armored) + + for data in values: + serialized_data = serializer.serialize(data) + assert data != serialized_data + deserialized_data = serializer.deserialize(serialized_data) + assert serialized_data != deserialized_data + assert data == deserialized_data + + +def test_accept(make_single_serializer): + # Serialize with the json serializer, but accept only raw strings or bytes + serializer = make_single_serializer( + 'json', ['application/data', 'application/text'] + ) + data = 'secret message' + serialized_data = serializer.serialize(data) + with pytest.raises(ContentDisallowed): + serializer.deserialize(serialized_data) + + +def test_invalid_signature(make_single_serializer): + serializer = make_single_serializer('raw') + data = 'secret message' + serialized_data = bytearray(serializer.serialize(data)) + for i in range(nacl.bindings.crypto_sign_BYTES): + with pytest.raises(nacl.exceptions.BadSignatureError): + orig = serialized_data[i] + serialized_data[i] = ~orig & 0xff + serializer.deserialize(bytes(serialized_data)) + serialized_data[i] = orig + + +def test_unknown_signer( + make_single_serializer: single_serializer_factory, + make_dual_serializers: dual_serializer_factory, +): + alice, bob = make_dual_serializers('raw') + carol = make_single_serializer('raw') + carols_message = carol.serialize(b'secret message') + with pytest.raises(ValueError): + alice.deserialize(carols_message) + + +@pytest.mark.parametrize( + 'register_func', + (register_raw, register_armored), + ids=('raw', 'armored'), +) +def test_register( + make_dual_serializers: dual_serializer_factory, + register_func: partial, +): + inner_serializer = 'raw' + accept = [inner_serializer] + expected_data = 'secret message' + name = register_func.keywords['name'] + armored = register_func.keywords['armored'] + expected_content_type = register_func.keywords['content_type'] + alice_serializer, bob_serializer = make_dual_serializers(inner_serializer, accept, armored) + with pytest.raises(SerializerNotInstalled): + registry.dumps(expected_data, name) + try: + register_func( + alice_serializer._signing_key, bob_serializer._verify_keys, + inner_serializer=inner_serializer, accept=accept, + ) + got_content_type, got_content_encoding, got_payload = registry.dumps(expected_data, name) + finally: + registry.unregister(name) + assert expected_content_type == got_content_type + assert 'binary' == got_content_encoding + assert expected_data == bob_serializer.deserialize(got_payload)