Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Serializer to be more efficient #874

Merged
merged 4 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 31 additions & 49 deletions doc/reference/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,26 @@ Each serializable class has to specify the following class members:
:widths: 10, 20

"format_list", "A list containing valid data type primitive names."
"optional_format_list", "A list containing valid data type primitive names. As the name implies, any number of arguments may be supplied."
"names", "Only for VariablePayload classes, the instance fields to bind the data types to."


As an example, we will now define three completely wire-format compatible messages using the three classes.
Each of the messages will serialize to a (four byte) unsigned integer followed by an optional (two byte) unsigned short.
Each of the messages will serialize to a (four byte) unsigned integer followed by an (two byte) unsigned short.
Each instance will have two fields: ``field1`` and ``field2`` corresponding to the integer and short.

.. code-block:: python

class MySerializable(Serializable):

format_list = ['I']
optional_format_list = ['H']
format_list = ['I', 'H']

def __init__(self, field1, field2=None):
def __init__(self, field1, field2):
self.field1 = field1
self.field2 = field2

def to_pack_list(self):
out = [('I', self.field1)]
if self.field2 is not None:
out += [('H', self.field2)]
return out
return [('I', self.field1),
('H', self.field2)]

@classmethod
def from_unpack_list(cls, *args):
Expand All @@ -60,18 +56,15 @@ Each instance will have two fields: ``field1`` and ``field2`` corresponding to t

class MyPayload(Payload):

format_list = ['I']
optional_format_list = ['H']
format_list = ['I', 'H']

def __init__(self, field1, field2=None):
def __init__(self, field1, field2):
self.field1 = field1
self.field2 = field2

def to_pack_list(self):
out = [('I', self.field1)]
if self.field2 is not None:
out += [('H', self.field2)]
return out
return [('I', self.field1),
('H', self.field2)]

@classmethod
def from_unpack_list(cls, *args):
Expand All @@ -80,15 +73,13 @@ Each instance will have two fields: ``field1`` and ``field2`` corresponding to t

class MyVariablePayload(VariablePayload):

format_list = ['I']
optional_format_list = ['H']
format_list = ['I', 'H']
names = ['field1', 'field2']

@vp_compile
class MyCVariablePayload(VariablePayload):

format_list = ['I']
optional_format_list = ['H']
format_list = ['I', 'H']
names = ['field1', 'field2']


Expand All @@ -97,10 +88,10 @@ To show some of the differences, let's check out the output of the following scr

.. code-block:: python

serializable1 = MySerializable(1)
serializable2 = MyPayload(1)
serializable3 = MyVariablePayload(1)
serializable4 = MyCVariablePayload(1)
serializable1 = MySerializable(1, 2)
serializable2 = MyPayload(1, 2)
serializable3 = MyVariablePayload(1, 2)
serializable4 = MyCVariablePayload(1, 2)

print("As string:")
print(serializable1)
Expand All @@ -126,44 +117,35 @@ To show some of the differences, let's check out the output of the following scr
print(timeit.timeit('serializable3.from_unpack_list(1, 2)', number=1000, globals=locals()))
print(timeit.timeit('serializable4.from_unpack_list(1, 2)', number=1000, globals=locals()))

print("Unserialization speed w/o optional:")
print(timeit.timeit('serializable1.from_unpack_list(1)', number=1000, globals=locals()))
print(timeit.timeit('serializable2.from_unpack_list(1)', number=1000, globals=locals()))
print(timeit.timeit('serializable3.from_unpack_list(1)', number=1000, globals=locals()))
print(timeit.timeit('serializable4.from_unpack_list(1)', number=1000, globals=locals()))


.. code-block:: bash

As string:
<__main__.MySerializable object at 0x7fb493a8b1d0>
<__main__.MySerializable object at 0x00000127F1B91F70>
MyPayload
| field1: 1
| field2: None
| field2: 2
MyVariablePayload
| field1: 1
| field2: 2
MyCVariablePayload
| field1: 1
| field2: 2
Field values:
1 None
1 None
1 <undefined>
1 <undefined>
1 2
1 2
1 2
1 2
Serialization speed:
0.0007182089993875707
0.0007311019999178825
0.006567462998646079
0.0008536430013919016
0.00020690000000000985
0.00020630000000000648
0.0015785999999999994
0.0002122999999999986
Unserialization speed:
0.0013339410015760222
0.0014789169999858132
0.01917448600033822
0.0028652559994952753
Unserialization speed w/o optional:
0.001269377000426175
0.0012895309992018156
0.014515060998746776
0.0018252249992656289
0.0003621000000000041
0.00036540000000000183
0.0036703999999999903
0.00045059999999999545

.. _Datatypes Section:

Expand Down
4 changes: 2 additions & 2 deletions ipv8/REST/attestation_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..attestation.wallet.community import AttestationCommunity
from ..keyvault.crypto import default_eccrypto
from ..peer import Peer
from ..util import cast_to_bin, strip_sha1_padding, succeed
from ..util import strip_sha1_padding, succeed


class AttestationEndpoint(BaseEndpoint):
Expand Down Expand Up @@ -315,7 +315,7 @@ async def handle_post(self, request):
if 'metadata' in args:
metadata_unicode = json.loads(b64decode(args['metadata']))
for k, v in metadata_unicode.items():
metadata[cast_to_bin(k)] = cast_to_bin(v)
metadata[k.encode()] = v.encode()
blob = await request.read()

self.attestation_overlay.dump_blob(attribute_name, id_format, blob, metadata)
Expand Down
12 changes: 3 additions & 9 deletions ipv8/REST/isolation_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from .schema import DefaultResponseSchema, schema
from ..community import _DEFAULT_ADDRESSES
from ..messaging.anonymization.community import TunnelCommunity
from ..util import cast_to_chr


class IsolationEndpoint(BaseEndpoint):
Expand Down Expand Up @@ -58,14 +57,9 @@ async def handle_post(self, request):
if 'exitnode' not in args and 'bootstrapnode' not in args:
return Response({"success": False, "error": "Parameter 'exitnode' or 'bootstrapnode' is required"},
status=HTTP_BAD_REQUEST)
# Attempt to decode the address
try:
address_str = cast_to_chr(args['ip'])
port_str = cast_to_chr(args['port'])
fmt_address = (address_str, int(port_str))
except Exception:
import traceback
return Response({"success": False, "error": traceback.format_exc()}, status=HTTP_BAD_REQUEST)
address_str = args['ip']
port_num = args['port']
fmt_address = (address_str, port_num)
# Actually add the address to the requested service
if 'exitnode' in args:
self.add_exit_node(fmt_address)
Expand Down
2 changes: 1 addition & 1 deletion ipv8/attestation/trustchain/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def pack(self, signature=True):
args = [self.public_key, self.sequence_number, self.link_public_key, self.link_sequence_number,
self.previous_hash, self.signature if signature else EMPTY_SIG, self.type, self._transaction,
self.timestamp]
return self.serializer.pack_multiple(HalfBlockPayload(*args).to_pack_list())[0]
return self.serializer.pack_serializable(HalfBlockPayload(*args))

def validate_transaction(self, database):
"""
Expand Down
26 changes: 13 additions & 13 deletions ipv8/attestation/trustchain/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ def send_block(self, block, address=None, ttl=1):
Send a block to a specific address, or do a broadcast to known peers if no peer is specified.
"""
global_time = self.claim_global_time()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time)

if address:
self.logger.debug("Sending block to (%s:%d) (%s)", address[0], address[1], block)
payload = HalfBlockPayload.from_half_block(block).to_pack_list()
payload = HalfBlockPayload.from_half_block(block)
packet = self._ez_pack(self._prefix, 1, [dist, payload], False)
self.endpoint.send(address, packet)
else:
self.logger.debug("Broadcasting block %s", block)
payload = HalfBlockBroadcastPayload.from_half_block(block, ttl).to_pack_list()
payload = HalfBlockBroadcastPayload.from_half_block(block, ttl)
packet = self._ez_pack(self._prefix, 5, [dist, payload], False)
peers = self.get_peers()
for peer in random.sample(peers, min(len(peers), self.settings.broadcast_fanout)):
Expand All @@ -161,16 +161,16 @@ def send_block_pair(self, block1, block2, address=None, ttl=1):
Send a half block pair to a specific address, or do a broadcast to known peers if no peer is specified.
"""
global_time = self.claim_global_time()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time)

if address:
self.logger.debug("Sending block pair to (%s:%d) (%s and %s)", address[0], address[1], block1, block2)
payload = HalfBlockPairPayload.from_half_blocks(block1, block2).to_pack_list()
payload = HalfBlockPairPayload.from_half_blocks(block1, block2)
packet = self._ez_pack(self._prefix, 4, [dist, payload], False)
self.endpoint.send(address, packet)
else:
self.logger.debug("Broadcasting blocks %s and %s", block1, block2)
payload = HalfBlockPairBroadcastPayload.from_half_blocks(block1, block2, ttl).to_pack_list()
payload = HalfBlockPairBroadcastPayload.from_half_blocks(block1, block2, ttl)
packet = self._ez_pack(self._prefix, 6, [dist, payload], False)
peers = self.get_peers()
for peer in random.sample(peers, min(len(peers), self.settings.broadcast_fanout)):
Expand Down Expand Up @@ -464,9 +464,9 @@ def send_crawl_request(self, peer, public_key, start_seq_num, end_seq_num, for_h
hexlify(peer.public_key.key_to_bin())[-8:], start_seq_num, end_seq_num, crawl_id)

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = CrawlRequestPayload(public_key, start_seq_num, end_seq_num, crawl_id).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = CrawlRequestPayload(public_key, start_seq_num, end_seq_num, crawl_id)
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 2, [auth, dist, payload])
self.endpoint.send(peer.address, packet)
Expand Down Expand Up @@ -562,8 +562,8 @@ def received_crawl_request(self, peer, dist, payload):

if total_count == 0:
global_time = self.claim_global_time()
response_payload = EmptyCrawlResponsePayload(payload.crawl_id).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
response_payload = EmptyCrawlResponsePayload(payload.crawl_id)
dist = GlobalTimeDistributionPayload(global_time)
packet = self._ez_pack(self._prefix, 7, [dist, response_payload], False)
self.endpoint.send(peer.address, packet)
else:
Expand Down Expand Up @@ -627,8 +627,8 @@ def send_crawl_response(self, block, crawl_id, index, total_count, peer):
return

global_time = self.claim_global_time()
payload = CrawlResponsePayload.from_crawl(block, crawl_id, index, total_count).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
payload = CrawlResponsePayload.from_crawl(block, crawl_id, index, total_count)
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 3, [dist, payload], False)
self.endpoint.send(peer.address, packet)
Expand Down
45 changes: 22 additions & 23 deletions ipv8/attestation/wallet/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ...messaging.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload
from ...peer import Peer
from ...requestcache import RequestCache
from ...util import cast_to_bin, cast_to_chr, maybe_coroutine
from ...util import maybe_coroutine


def synchronized(f):
Expand Down Expand Up @@ -151,16 +151,16 @@ def request_attestation(self, peer, attribute_name, secret_key, metadata={}):

meta_dict = {
"attribute": attribute_name,
"public_key": cast_to_chr(encodebytes(public_key.serialize())),
"public_key": encodebytes(public_key.serialize()).decode(),
"id_format": id_format
}
meta_dict.update(metadata)
metadata = json.dumps(meta_dict).encode()

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = RequestAttestationPayload(metadata).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = RequestAttestationPayload(metadata)
dist = GlobalTimeDistributionPayload(global_time)

gtime_str = str(global_time).encode('utf-8')
self.request_cache.add(ReceiveAttestationRequestCache(self, peer.mid + gtime_str, secret_key, attribute_name,
Expand All @@ -178,7 +178,7 @@ async def on_request_attestation(self, peer, dist, payload):
"""
metadata = json.loads(payload.metadata)
attribute = metadata.pop('attribute')
pubkey_b64 = cast_to_bin(metadata.pop('public_key'))
pubkey_b64 = metadata.pop('public_key').encode()
id_format = metadata.pop('id_format')
id_algorithm = self.get_id_algorithm(id_format)

Expand All @@ -198,7 +198,7 @@ def on_attestation_complete(self, unserialized, secret_key, peer, name, attestat
"""
We got an Attestation delivered to us.
"""
self.attestation_keys[cast_to_bin(attestation_hash)] = (secret_key, id_format)
self.attestation_keys[attestation_hash] = (secret_key, id_format)
self.database.insert_attestation(unserialized, attestation_hash, secret_key, id_format)
self.attestation_request_complete_callback(self.my_peer, name, attestation_hash, id_format, peer)

Expand Down Expand Up @@ -230,9 +230,9 @@ def create_verify_attestation_request(self, socket_address, attestation_hash, id
self.request_cache.add(ReceiveAttestationVerifyCache(self, attestation_hash, id_format))

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = VerifyAttestationRequestPayload(attestation_hash).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = VerifyAttestationRequestPayload(attestation_hash)
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 1, [auth, dist, payload])
self.endpoint.send(socket_address, packet)
Expand Down Expand Up @@ -270,9 +270,9 @@ def send_attestation(self, socket_address, blob, global_time=None):
self.logger.debug("Sending attestation chunk %d to %s", sequence_number, str(socket_address))
if global_time is None:
global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = AttestationChunkPayload(sha1(blob).digest(), sequence_number, blob_chunk).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = AttestationChunkPayload(sha1(blob).digest(), sequence_number, blob_chunk)
dist = GlobalTimeDistributionPayload(global_time)
packet = self._ez_pack(self._prefix, 2, [auth, dist, payload])
self.endpoint.send(socket_address, packet)

Expand Down Expand Up @@ -364,9 +364,9 @@ def on_received_attestation(self, peer, attestation, attestation_hash):
self.request_cache.add(PendingChallengeCache(self, sha1(challenge).digest(), cache, cache.id_format))

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = ChallengePayload(attestation_hash, challenge).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = ChallengePayload(attestation_hash, challenge)
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 3, [auth, dist, payload])
self.endpoint.send(peer.address, packet)
Expand All @@ -382,11 +382,10 @@ def on_challenge(self, peer, dist, payload):
attestation = self.cached_attestation_blobs[payload.attestation_hash]

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = ChallengeResponsePayload(challenge_hash,
algorithm.create_challenge_response(SK, attestation, payload.challenge)
).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
algorithm.create_challenge_response(SK, attestation, payload.challenge))
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 4, [auth, dist, payload])
self.endpoint.send(peer.address, packet)
Expand Down Expand Up @@ -450,9 +449,9 @@ def on_challenge_response(self, peer, dist, payload):
cache.id_format, honesty_check_byte))

global_time = self.claim_global_time()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = ChallengePayload(proving_cache.hash, challenge).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin())
payload = ChallengePayload(proving_cache.hash, challenge)
dist = GlobalTimeDistributionPayload(global_time)

packet = self._ez_pack(self._prefix, 3, [auth, dist, payload])
self.endpoint.send(peer.address, packet)
Loading