Skip to content

Commit

Permalink
Change signature encoding
Browse files Browse the repository at this point in the history
The former JSON document stream based encoding has various shortcomings.
The new encoding is more robust, because no ambiguity or tolerance with
regards to whitespace exists.

There are two variants: Binary and base64-encoded.

The signature now also covers all the metadata (except the signature
itself of course):

    +-----------------------------+----+  <--+
    | Signature (64 bytes)             |     |
    |                                  |     H
    +--------------------------------- +     e   <--+
    | Signer (32 bytes)                |     a      M
    +--------------------------------- +     d      e
    | Content Type (variable)     | 00 |     e      s
    +-----------------------------+----+     r      s
    | Content Encoding (variable) | 00 |     |      a
    +-----------------------------+----+  <--+      g
    | Payload (variable)               |            e
    +----------------------------------+         <--+

With base64 encoding the header becomes:

    +-----------------------------+----+
    | Header (variable, base 64)  | \n |
    +--------------------------------- +
    | Payload (variable)               |
    +----------------------------------+

Addresses #68
  • Loading branch information
sebschrader committed Sep 10, 2022
1 parent 9fc8047 commit 5a9c4b7
Show file tree
Hide file tree
Showing 2 changed files with 381 additions and 111 deletions.
296 changes: 185 additions & 111 deletions src/hades/agent/signature.py
Original file line number Diff line number Diff line change
@@ -1,132 +1,206 @@
# -*- coding: utf-8 -*-
"""
A serializer that prepends a JSON header with an ed25519 signature before the
data.
The header is a completely separate JSON document, that is separated from the
body by optional whitespace. This allows parsing the header without parsing the
payload. Also it is not required to encode the payload. Raw data can also be
inspected by humans easily. The only problem with this scheme is, that the
payload may not start with space, because this white space would be striped
and signature verification would fail.
The JSON payload ``{"Foobar": 1}`` would be encoded as follows:
.. code-block :: json
{
"signature": "u9rPWbQh3TNpW8wrimL5SHtelkgm32cTPfzaUgp+djMDGz/Vjf/mb6BtQcXpJ1noJl2xILTWxhrpqtv9ykf2Bw==",
"signer": "w7ADgLSZTlXIDY/qbcfxUCeXht8VcpGoJYOj0lQu1Qw=",
"content_type": "application/json",
"content_encoding": "utf-8"
}
{
"Foobar": 1
}
A serializer that encodes data using another inner serializer and signs the
encoded binary data with an ed25519 signature. A single inner serializer can be
specified for serialization, but multiple inner serializers can be configured
for deserialization.
The binary data encoded by the inner serializer is prepended with the signature,
the public key that produced the signature message, the content type and the
content encoding of the inner serializer. The content type and the content
encoding are delimited by a null byte (\x00).
The output of the inner serializer is left as is as payload without any
additional encoding.
.. code-block :: text
+-----------------------------+----+ <--+
| Signature (64 bytes) | |
| | H
+--------------------------------- + e <--+
| Signer (32 bytes) | a M
+--------------------------------- + d e
| Content Type (variable) | 00 | e s
+-----------------------------+----+ r s
| Content Encoding (variable) | 00 | | a
+-----------------------------+----+ <--+ g
| Payload (variable) | e
+----------------------------------+ <--+
The signature is computed over everything (including signer, content type,
content encoding).
The header may be armored by encoding it using base 64 (without newlines) and
placing newline (\n) between header and payload:
.. code-block :: text
+-----------------------------+----+
| Header (variable, base 64) | \n |
+--------------------------------- +
| Payload (variable) |
+----------------------------------+
"""
import io
import json
from typing import Iterable, Union
import base64
from functools import partial
from typing import Iterable, Optional, Union

import nacl.bindings
import nacl.encoding
import nacl.signing
from kombu.serialization import dumps, loads, register as kombu_register
from kombu.utils.encoding import bytes_to_str

__all__ = ['ED25519Serializer', 'register']


class ED25519Serializer(object):
key_codec = nacl.encoding.Base64Encoder()
json_decoder = json.JSONDecoder()
whitespace = b' \t\n\r'

def __init__(self, signing_key: nacl.signing.SigningKey,
verify_keys: Iterable[nacl.signing.VerifyKey],
serializer='json', content_encoding='utf-8'):
self._signing_key = signing_key
from kombu.serialization import dumps, loads, registry

__all__ = ("ED25519Serializer", "register", "register_armored", "register_raw")
# Extracts signer from a raw signed message
from hades.common.util import qualified_name

SIGNER_SLICE = slice(
nacl.bindings.crypto_sign_BYTES,
nacl.bindings.crypto_sign_BYTES + nacl.bindings.crypto_sign_PUBLICKEYBYTES
)
# Extracts message from a signed message
MESSAGE_SLICE = slice(
nacl.bindings.crypto_sign_BYTES + nacl.bindings.crypto_sign_PUBLICKEYBYTES,
None
)


class ED25519Serializer:
def __init__(
self, signing_key: nacl.signing.SigningKey,
verify_keys: Iterable[nacl.signing.VerifyKey],
inner_serializer: str = "json",
accept: Optional[Iterable] = None,
armored: bool = False,
):
# Create copies of keys with raw encoder
self._signing_key = nacl.signing.SigningKey(bytes(signing_key))
self._signer = bytes(signing_key.verify_key)
self._verify_keys = {
self.key_codec.encode(bytes(key)).decode('ascii'): key
bytes(key): nacl.signing.VerifyKey(bytes(key))
for key in verify_keys
}
self._serializer = serializer
self._signer = self.key_codec.encode(
bytes(self._signing_key.verify_key)).decode('ascii')
self._content_encoding = content_encoding
self._inner_serializer = inner_serializer
self._accept = frozenset(accept) if accept else None
self._armored = armored

def _ensure_bytes(self, data: Union[bytes, str]):
@staticmethod
def _ensure_bytes(data: Union[bytes, str], content_encoding: str) -> bytes:
if isinstance(data, bytes):
return data
return data.encode(self._content_encoding)
if isinstance(data, bytearray):
return bytes(data)
elif isinstance(data, str):
return data.encode(content_encoding)
elif isinstance(data, memoryview):
return data.tobytes()
else:
raise TypeError(
f"Argument should be bytes, bytearray, str, or memoryview, "
f"not {qualified_name(type(data))}"
)

def serialize(self, data):
content_type, content_encoding, body = dumps(
data, serializer=self._serializer)
if content_encoding != self._content_encoding:
raise ValueError("Content encoding of inner serializer {!r} must "
"match ({!r} != {!r})"
.format(self._serializer, content_encoding,
self._content_encoding))
body = self._ensure_bytes(body)
if len(body) > 0 and body[0] in self.whitespace:
raise ValueError("Inner data may not begin with the following "
"characters {!r}"
.format(str(self.whitespace)))
message = self._signing_key.sign(body)
signature = self.key_codec.encode(message.signature).decode('ascii')
header = {
'signature': signature,
'signer': self._signer,
'content_type': content_type,
'content_encoding': content_encoding,
}
buffer = io.BytesIO()
wrapper = io.TextIOWrapper(buffer, self._content_encoding,
write_through=True)
with wrapper:
json.dump(header, wrapper)
buffer.write(b"\n")
buffer.write(message.message)
return buffer.getvalue()

def parse_header(self, data):
return self.json_decoder.raw_decode(data.decode(self._content_encoding))

def deserialize(self, data):
data = self._ensure_bytes(data)
header, end = self.parse_header(data)
# Skip whitespace
length = len(data)
while end < length and data[end] in self.whitespace:
end += 1
header, body = header, data[end:]

signer, signature, content_type, content_encoding = (
header['signer'], header['signature'],
header['content_type'], header['content_encoding']
content_type, content_encoding, payload = dumps(
data, serializer=self._inner_serializer
)
signature = self.key_codec.decode(signature)
if content_encoding != self._content_encoding:
raise ValueError("Invalid inner content encoding ({!r} != {!r})"
.format(content_encoding, self._content_encoding))
payload = self._ensure_bytes(payload, content_encoding)
content_type = self._ensure_bytes(content_type, "us-ascii")
content_encoding = self._ensure_bytes(content_encoding, "us-ascii")
message = b"".join((
self._signer,
content_type,
b"\x00",
content_encoding,
b"\x00",
payload,
))
signed_message = self._signing_key.sign(message)
if self._armored:
return base64.b64encode(
signed_message[:-len(payload)]
) + b"\n" + payload
else:
return signed_message

def deserialize(self, data: bytes):
if self._armored:
header, sep, payload = data.partition(b"\n")
if not sep:
raise ValueError("Invalid message: Expected armored header")
header = base64.b64decode(header, validate=True)
data = header + payload
signer = data[SIGNER_SLICE]

try:
verify_key = self._verify_keys[signer]
except KeyError:
raise ValueError("Unknown signer {!r}".format(signer)) from None
verify_key.verify(body, signature)
return loads(bytes_to_str(body), content_type, content_encoding,
force=True)


def register(signing_key: nacl.signing.SigningKey,
verify_keys: Iterable[nacl.signing.VerifyKey],
name: str = 'ed25519', serializer='json',
content_type: str = 'application/x-data-ed25519',
content_encoding: str = 'utf-8'):
"""Register serializer with :mod:`kombu`"""
s = ED25519Serializer(signing_key, verify_keys, serializer,
content_encoding)
kombu_register(name, s.serialize, s.deserialize, content_type,
content_encoding)
message = verify_key.verify(data)
# Skip signer
message = message[nacl.bindings.crypto_sign_PUBLICKEYBYTES:]

try:
content_type, content_encoding, payload = message.split(
b"\x00", maxsplit=2
)
except ValueError:
raise ValueError(
"Invalid message: No content_type/content_encoding before "
"payload"
) from None
content_type = content_type.decode("us-ascii")
content_encoding = content_encoding.decode("us-ascii")
return loads(
payload, content_type, content_encoding,
accept=self._accept,
)


def register(
signing_key: nacl.signing.SigningKey,
verify_keys: Iterable[nacl.signing.VerifyKey],
name: str,
content_type: str,
armored: bool,
*,
inner_serializer: str = "json",
accept: Optional[Iterable[str]] = ("application/json",),
):
"""
Register serializer with :mod:`kombu`.
:param signing_key: The key used for signing serialized messages
:param verify_keys: The keys used for verifying messages before
deserialization
:param inner_serializer: The serializer to use for serializing the data
before signing
:param accept: If not :obj:`None` an iterable of content types that may be
decoded
:param name: Name of the serializer
:param content_type: Content type of the serializer
:param armored: Specifies whether the leading signature header should be
base64 encoded
"""
s = ED25519Serializer(
signing_key, verify_keys, inner_serializer, accept, armored,
)
registry.register(
name, s.serialize, s.deserialize, content_type, "binary",
)


register_raw = partial(
register,
name="ed25519",
content_type="application/x.data.ed25519",
armored=False,
)
register_armored = partial(
register,
name="ed25519.armored",
content_type="application/x.data.ed25519.armored",
armored=True,
)
Loading

0 comments on commit 5a9c4b7

Please sign in to comment.