Skip to content

Commit

Permalink
part2: new serialization impl in python
Browse files Browse the repository at this point in the history
  • Loading branch information
chaokunyang committed Dec 8, 2024
1 parent d666140 commit aab37a7
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 769 deletions.
2 changes: 1 addition & 1 deletion python/pyfury/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from pyfury.type import ( # noqa: F401 # pylint: disable=unused-import
record_class_factory,
get_qualified_classname,
FuryType,
TypeId,
Int8Type,
Int16Type,
Int32Type,
Expand Down
96 changes: 28 additions & 68 deletions python/pyfury/_fury.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from pyfury._serializer import (
Serializer,
SerializationContext,
NOT_SUPPORT_CROSS_LANGUAGE,
BufferObject,
PYINT_CLASS_ID,
PYBOOL_CLASS_ID,
Expand All @@ -37,14 +36,14 @@
NO_CLASS_ID,
)
from pyfury.buffer import Buffer
from pyfury.lib import mmh3
from pyfury.meta.metastring import Encoding
from pyfury.resolver import (
MapRefResolver,
NoRefResolver,
NULL_FLAG,
NOT_NULL_VALUE_FLAG,
)
from pyfury.type import FuryType
from pyfury.type import TypeId
from pyfury.util import is_little_endian, set_bit, get_bit, clear_bit

try:
Expand Down Expand Up @@ -73,15 +72,15 @@ class MetaStringBytes:
"data",
"length",
"hashcode",
"encoding",
"dynamic_write_string_id",
)

def __init__(self, data, hashcode=None):
def __init__(self, data, hashcode):
self.data = data
self.length = len(data)
if hashcode is None:
hashcode = (mmh3.hash_buffer(data, 47)[0] >> 8) << 8
self.hashcode = hashcode
self.encoding = Encoding(hashcode & 0xFF)
self.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_STRING_ID

def __eq__(self, other):
Expand All @@ -90,6 +89,9 @@ def __eq__(self, other):
def __hash__(self):
return self.hashcode

def decode(self, decoder):
return decoder.decode(self.encoding)


class ClassInfo:
__slots__ = (
Expand All @@ -98,23 +100,24 @@ class ClassInfo:
"serializer",
"namespace_bytes",
"typename_bytes",
"dynamic_type",
)

def __init__(
self,
cls: type = None,
class_id: int = NO_CLASS_ID,
serializer: Serializer = None,
namespace_bytes: bytes = None,
typename_bytes: bytes = None,
namespace_bytes: MetaStringBytes = None,
typename_bytes: MetaStringBytes = None,
dynamic_type: bool = False,
):
self.cls = cls
self.class_id = class_id
self.serializer = serializer
self.namespace_bytes = MetaStringBytes(namespace_bytes)
self.typename_bytes = (
MetaStringBytes(typename_bytes) if typename_bytes else None
)
self.namespace_bytes = namespace_bytes
self.typename_bytes = typename_bytes
self.dynamic_type = dynamic_type

def __repr__(self):
return (
Expand Down Expand Up @@ -346,27 +349,14 @@ def xserialize_ref(self, buffer, obj, serializer=None):
self.xserialize_nonref(buffer, obj, serializer=serializer)

def xserialize_nonref(self, buffer, obj, serializer=None):
cls = type(obj)
serializer = serializer or self.class_resolver.get_serializer(obj=obj)
type_id = serializer.get_xtype_id()
buffer.write_int16(type_id)
if type_id != NOT_SUPPORT_CROSS_LANGUAGE:
if type_id == FuryType.FURY_TYPE_TAG.value:
self.class_resolver.xwrite_type_tag(buffer, cls)
if type_id < NOT_SUPPORT_CROSS_LANGUAGE:
self.class_resolver.xwrite_class(buffer, cls)
if serializer is not None:
serializer.xwrite(buffer, obj)
else:
# Write classname so it can be used for debugging which object doesn't
# support cross-language.
# TODO add a config to disable this to reduce space cost.
self.class_resolver.xwrite_class(buffer, cls)
# serializer may increase reference id multi times internally, thus peer
# cross-language later fields/objects deserialization will use wrong
# reference id since we skip opaque objects deserialization.
# So we stash native objects and serialize all those object at the last.
buffer.write_varint32(len(self._native_objects))
self._native_objects.append(obj)
return
cls = type(obj)
classinfo = self.class_resolver.get_classinfo(cls)
if not classinfo.dynamic_type:
self.class_resolver.write_classinfo(classinfo)
classinfo.serializer.xwrite(buffer, obj)

def deserialize(
self,
Expand Down Expand Up @@ -461,12 +451,11 @@ def deserialize_nonref(self, buffer):
def xdeserialize_ref(self, buffer, serializer=None):
if serializer is None or serializer.need_to_write_ref:
ref_resolver = self.ref_resolver
red_id = ref_resolver.try_preserve_ref_id(buffer)

ref_id = ref_resolver.try_preserve_ref_id(buffer)
# indicates that the object is first read.
if red_id >= NOT_NULL_VALUE_FLAG:
if ref_id >= NOT_NULL_VALUE_FLAG:
o = self.xdeserialize_nonref(buffer, serializer=serializer)
ref_resolver.set_read_object(red_id, o)
ref_resolver.set_read_object(ref_id, o)
return o
else:
return ref_resolver.get_read_object()
Expand All @@ -476,38 +465,9 @@ def xdeserialize_ref(self, buffer, serializer=None):
return self.xdeserialize_nonref(buffer, serializer=serializer)

def xdeserialize_nonref(self, buffer, serializer=None):
type_id = buffer.read_int16()
cls = None
if type_id != NOT_SUPPORT_CROSS_LANGUAGE:
if type_id == FuryType.FURY_TYPE_TAG.value:
cls = self.class_resolver.read_class_by_type_tag(buffer)
if type_id < NOT_SUPPORT_CROSS_LANGUAGE:
if self._peer_language is not Language.PYTHON:
self.class_resolver.read_enum_string_bytes(buffer)
cls = self.class_resolver.get_class_by_type_id(-type_id)
serializer = serializer or self.class_resolver.get_serializer(
type_id=-type_id
)
else:
cls = self.class_resolver.xread_class(buffer)
serializer = serializer or self.class_resolver.get_serializer(
cls=cls, type_id=type_id
)
else:
if type_id != FuryType.FURY_TYPE_TAG.value:
cls = self.class_resolver.get_class_by_type_id(type_id)
serializer = serializer or self.class_resolver.get_serializer(
cls=cls, type_id=type_id
)
assert cls is not None
return serializer.xread(buffer)
else:
class_name = self.class_resolver.xread_classname(buffer)
ordinal = buffer.read_varint32()
if self._peer_language != Language.PYTHON:
return OpaqueObject(self._peer_language, class_name, ordinal)
else:
return self._native_objects[ordinal]
if serializer is None:
serializer = self.class_resolver.read_classinfo(buffer).serializer
return serializer.xread(buffer)

def write_buffer_object(self, buffer, buffer_object: BufferObject):
if self._buffer_callback is None or self._buffer_callback(buffer_object):
Expand Down
Loading

0 comments on commit aab37a7

Please sign in to comment.