diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java index 7a0f78dc13..a2f9a420ba 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/EnumSerializer.java @@ -100,6 +100,20 @@ public Enum read(MemoryBuffer buffer) { } } + @Override + public void xwrite(MemoryBuffer buffer, Enum value) { + buffer.writeVarUint32Small7(value.ordinal()); + } + + @Override + public Enum xread(MemoryBuffer buffer) { + int value = buffer.readVarUint32Small7(); + if (value >= enumConstants.length) { + return handleNonexistentEnumValue(value); + } + return enumConstants[value]; + } + private Enum handleNonexistentEnumValue(int value) { if (fury.getConfig().deserializeNonexistentEnumValueAsNull()) { return null; diff --git a/python/pyfury/__init__.py b/python/pyfury/__init__.py index c4396421f8..bfe719d083 100644 --- a/python/pyfury/__init__.py +++ b/python/pyfury/__init__.py @@ -33,7 +33,6 @@ Language, ClassInfo, OpaqueObject, - ComplexObjectSerializer, ) else: from pyfury._fury import ( # noqa: F401,F403,F811 # pylint: disable=unused-import @@ -42,9 +41,9 @@ ClassInfo, OpaqueObject, ) - from pyfury._struct import ( # noqa: F401,F403,F811 # pylint: disable=unused-import - ComplexObjectSerializer, - ) +from pyfury._struct import ( # noqa: F401,F403,F811 # pylint: disable=unused-import + ComplexObjectSerializer, +) from pyfury.serializer import * # noqa: F401,F403 # pylint: disable=unused-import from pyfury.type import ( # noqa: F401 # pylint: disable=unused-import record_class_factory, diff --git a/python/pyfury/_fury.py b/python/pyfury/_fury.py index 1304486467..13cf075f1d 100644 --- a/python/pyfury/_fury.py +++ b/python/pyfury/_fury.py @@ -15,62 +15,36 @@ # specific language governing permissions and limitations # under the License. -import array -import dataclasses -import datetime import enum import logging import os import sys import warnings from dataclasses import dataclass -from typing import Dict, Tuple, TypeVar, Union, Iterable +from typing import Union, Iterable -from pyfury.lib import mmh3 - -from pyfury.buffer import Buffer -from pyfury.meta.metastring import Encoding -from pyfury.resolver import ( - MapRefResolver, - NoRefResolver, - NULL_FLAG, - NOT_NULL_VALUE_FLAG, -) from pyfury._serializer import ( Serializer, SerializationContext, NOT_SUPPORT_CROSS_LANGUAGE, BufferObject, - PickleSerializer, - Numpy1DArraySerializer, - PyArraySerializer, PYINT_CLASS_ID, - PYFLOAT_CLASS_ID, PYBOOL_CLASS_ID, STRING_CLASS_ID, - PICKLE_CLASS_ID, NOT_NULL_STRING_FLAG, NOT_NULL_PYINT_FLAG, NOT_NULL_PYBOOL_FLAG, NO_CLASS_ID, - NoneSerializer, - _PickleStub, - PickleStrongCacheStub, - PICKLE_STRONG_CACHE_CLASS_ID, - PICKLE_CACHE_CLASS_ID, - PickleCacheStub, - SMALL_STRING_THRESHOLD, ) -from pyfury.type import ( - FuryType, - Int8Type, - Int16Type, - Int32Type, - Int64Type, - Float32Type, - Float64Type, - load_class, +from pyfury.buffer import Buffer +from pyfury.lib import mmh3 +from pyfury.resolver import ( + MapRefResolver, + NoRefResolver, + NULL_FLAG, + NOT_NULL_VALUE_FLAG, ) +from pyfury.type import FuryType from pyfury.util import is_little_endian, set_bit, get_bit, clear_bit try: @@ -122,8 +96,8 @@ class ClassInfo: "cls", "class_id", "serializer", - "class_name_bytes", - "type_tag_bytes", + "namespace_bytes", + "typename_bytes", ) def __init__( @@ -131,15 +105,15 @@ def __init__( cls: type = None, class_id: int = NO_CLASS_ID, serializer: Serializer = None, - class_name_bytes: bytes = None, - type_tag_bytes: bytes = None, + namespace_bytes: bytes = None, + typename_bytes: bytes = None, ): self.cls = cls self.class_id = class_id self.serializer = serializer - self.class_name_bytes = MetaStringBytes(class_name_bytes) - self.type_tag_bytes = ( - MetaStringBytes(type_tag_bytes) if type_tag_bytes else None + self.namespace_bytes = MetaStringBytes(namespace_bytes) + self.typename_bytes = ( + MetaStringBytes(typename_bytes) if typename_bytes else None ) def __repr__(self): @@ -149,458 +123,6 @@ def __repr__(self): ) -class ClassResolver: - __slots__ = ( - "fury", - "_type_id_to_class", - "_type_id_to_serializer", - "_type_id_and_cls_to_serializer", - "_type_tag_to_class_x_lang_map", - "_enum_str_to_str", - "_class_id_counter", - "_used_classes_id", - "_classes_info", - "_registered_id2_class_info", - "_hash_to_enum_string", - "_enum_str_to_class", - "_hash_to_classinfo", - "_dynamic_id_to_classinfo_list", - "_dynamic_id_to_enum_str_list", - "_serializer", - "_dynamic_write_string_id", - "_dynamic_written_enum_string", - ) - - _type_id_to_class: Dict[int, type] - _type_id_to_serializer: Dict[int, Serializer] - _type_id_and_cls_to_serializer: Dict[Tuple[int, type], Serializer] - _classes_info: Dict[type, "ClassInfo"] - - def __init__(self, fury): - self.fury = fury - self._type_id_to_class = dict() - self._type_id_to_serializer = dict() - self._type_id_and_cls_to_serializer = dict() - self._type_tag_to_class_x_lang_map = dict() - self._class_id_counter = PICKLE_CACHE_CLASS_ID + 1 - self._used_classes_id = set() - - self._classes_info = dict() - self._registered_id2_class_info = [] - - self._enum_str_to_str = dict() - self._enum_str_to_class = dict() - self._hash_to_enum_string = dict() - self._hash_to_classinfo = dict() - self._dynamic_id_to_classinfo_list = list() - self._dynamic_id_to_enum_str_list = list() - - self._serializer = None - self._dynamic_write_string_id = 0 - self._dynamic_written_enum_string = [] - - def initialize(self): - self.register_class(int, class_id=PYINT_CLASS_ID) - self.register_class(float, class_id=PYFLOAT_CLASS_ID) - self.register_class(bool, class_id=PYBOOL_CLASS_ID) - self.register_class(str, class_id=STRING_CLASS_ID) - self.register_class(_PickleStub, class_id=PICKLE_CLASS_ID) - self.register_class( - PickleStrongCacheStub, class_id=PICKLE_STRONG_CACHE_CLASS_ID - ) - self.register_class(PickleCacheStub, class_id=PICKLE_CACHE_CLASS_ID) - self._add_default_serializers() - - # `Union[type, TypeVar]` is not supported in py3.6 - def register_serializer(self, cls, serializer): - assert isinstance(cls, (type, TypeVar)), cls - type_id = serializer.get_xtype_id() - if type_id != NOT_SUPPORT_CROSS_LANGUAGE: - self._add_x_lang_serializer(cls, serializer=serializer) - else: - self.register_class(cls) - self._classes_info[cls].serializer = serializer - - # `Union[type, TypeVar]` is not supported in py3.6 - def register_class(self, cls, *, class_id: int = None, type_tag: str = None): - """Register class with given type id or tag, if tag is not None, it will be used for - cross-language serialization.""" - if type_tag is not None: - assert class_id is None, ( - f"Type tag {type_tag} has been set already, " - f"set class id at the same time is not allowed." - ) - from pyfury._struct import ComplexObjectSerializer - - self.register_serializer( - cls, ComplexObjectSerializer(self.fury, cls, type_tag) - ) - return - classinfo = self._classes_info.get(cls) - if classinfo is None: - if isinstance(cls, TypeVar): - class_name_bytes = (cls.__module__ + "#" + cls.__name__).encode("utf-8") - else: - class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode( - "utf-8" - ) - class_id = class_id if class_id is not None else self._next_class_id() - assert class_id not in self._used_classes_id, ( - self._used_classes_id, - self._classes_info, - ) - classinfo = ClassInfo( - cls=cls, class_name_bytes=class_name_bytes, class_id=class_id - ) - self._classes_info[cls] = classinfo - if len(self._registered_id2_class_info) <= class_id: - self._registered_id2_class_info.extend( - [None] * (class_id - len(self._registered_id2_class_info) + 1) - ) - self._registered_id2_class_info[class_id] = classinfo - else: - if classinfo.class_id == NO_CLASS_ID: - class_id = class_id if class_id is not None else self._next_class_id() - assert class_id not in self._used_classes_id, ( - self._used_classes_id, - self._classes_info, - ) - classinfo.class_id = class_id - if len(self._registered_id2_class_info) <= class_id: - self._registered_id2_class_info.extend( - [None] * (class_id - len(self._registered_id2_class_info) + 1) - ) - self._registered_id2_class_info[class_id] = classinfo - else: - if class_id is not None and classinfo.class_id != class_id: - raise ValueError( - f"Inconsistent class id {class_id} vs {classinfo.class_id} " - f"for class {cls}" - ) - - def _next_class_id(self): - class_id = self._class_id_counter = self._class_id_counter + 1 - while class_id in self._used_classes_id: - class_id = self._class_id_counter = self._class_id_counter + 1 - return class_id - - def _add_serializer(self, cls: type, serializer=None, serializer_cls=None): - if serializer_cls: - serializer = serializer_cls(self.fury, cls) - self.register_serializer(cls, serializer) - - def _add_x_lang_serializer(self, cls: type, serializer=None, serializer_cls=None): - if serializer_cls: - serializer = serializer_cls(self.fury, cls) - type_id = serializer.get_xtype_id() - from pyfury._serializer import NOT_SUPPORT_CROSS_LANGUAGE - - assert type_id != NOT_SUPPORT_CROSS_LANGUAGE - self._type_id_and_cls_to_serializer[(type_id, cls)] = serializer - self.register_class(cls) - classinfo = self._classes_info[cls] - classinfo.serializer = serializer - if type_id == FuryType.FURY_TYPE_TAG.value: - type_tag = serializer.get_xtype_tag() - assert type(type_tag) is str - assert type_tag not in self._type_tag_to_class_x_lang_map - classinfo.type_tag_bytes = MetaStringBytes(type_tag.encode("utf-8")) - self._type_tag_to_class_x_lang_map[type_tag] = cls - else: - self._type_id_to_serializer[type_id] = serializer - if type_id > NOT_SUPPORT_CROSS_LANGUAGE: - self._type_id_to_class[type_id] = cls - - def _add_default_serializers(self): - import pyfury.serializer as serializers - from pyfury._serializer import PyArraySerializer, Numpy1DArraySerializer - - self._add_x_lang_serializer(int, serializer_cls=serializers.ByteSerializer) - self._add_x_lang_serializer(int, serializer_cls=serializers.Int16Serializer) - self._add_x_lang_serializer(int, serializer_cls=serializers.Int32Serializer) - self._add_x_lang_serializer(int, serializer_cls=serializers.Int64Serializer) - self._add_x_lang_serializer(float, serializer_cls=serializers.FloatSerializer) - self._add_x_lang_serializer(float, serializer_cls=serializers.DoubleSerializer) - self._add_serializer(type(None), serializer_cls=NoneSerializer) - self._add_serializer(bool, serializer_cls=serializers.BooleanSerializer) - self._add_serializer(Int8Type, serializer_cls=serializers.ByteSerializer) - self._add_serializer(Int16Type, serializer_cls=serializers.Int16Serializer) - self._add_serializer(Int32Type, serializer_cls=serializers.Int32Serializer) - self._add_serializer(Int64Type, serializer_cls=serializers.Int64Serializer) - self._add_serializer(Float32Type, serializer_cls=serializers.FloatSerializer) - self._add_serializer(Float64Type, serializer_cls=serializers.DoubleSerializer) - self._add_serializer(str, serializer_cls=serializers.StringSerializer) - self._add_serializer(datetime.date, serializer_cls=serializers.DateSerializer) - self._add_serializer( - datetime.datetime, serializer_cls=serializers.TimestampSerializer - ) - self._add_serializer(bytes, serializer_cls=serializers.BytesSerializer) - self._add_serializer(list, serializer_cls=serializers.ListSerializer) - self._add_serializer(tuple, serializer_cls=serializers.TupleSerializer) - self._add_serializer(dict, serializer_cls=serializers.MapSerializer) - self._add_serializer(set, serializer_cls=serializers.SetSerializer) - self._add_serializer(enum.Enum, serializer_cls=serializers.EnumSerializer) - self._add_serializer(slice, serializer_cls=serializers.SliceSerializer) - from pyfury import PickleCacheSerializer, PickleStrongCacheSerializer - - self._add_serializer( - PickleStrongCacheStub, serializer=PickleStrongCacheSerializer(self.fury) - ) - self._add_serializer( - PickleCacheStub, serializer=PickleCacheSerializer(self.fury) - ) - try: - import pyarrow as pa - from pyfury.format.serializer import ( - ArrowRecordBatchSerializer, - ArrowTableSerializer, - ) - - self._add_serializer( - pa.RecordBatch, serializer_cls=ArrowRecordBatchSerializer - ) - self._add_serializer(pa.Table, serializer_cls=ArrowTableSerializer) - except Exception: - pass - for typecode in PyArraySerializer.typecode_dict.keys(): - self._add_serializer( - array.array, - serializer=PyArraySerializer(self.fury, array.array, typecode), - ) - self._add_serializer( - PyArraySerializer.typecodearray_type[typecode], - serializer=PyArraySerializer(self.fury, array.array, typecode), - ) - if np: - for dtype in Numpy1DArraySerializer.dtypes_dict.keys(): - self._add_serializer( - np.ndarray, - serializer=Numpy1DArraySerializer(self.fury, array.array, dtype), - ) - - def get_serializer(self, cls: type = None, type_id: int = None, obj=None): - """ - Returns - ------- - Returns or create serializer for the provided class - """ - assert cls is not None or type_id is not None or obj is not None - if obj is not None: - cls = type(obj) - if cls is int and 2**63 - 1 >= obj >= -(2**63): - type_id = FuryType.INT64.value - elif cls is float: - type_id = FuryType.DOUBLE.value - elif cls is array.array: - info = PyArraySerializer.typecode_dict.get(obj.typecode) - if info is not None: - type_id = info[1] - elif np and cls is np.ndarray and obj.ndim == 1: - info = Numpy1DArraySerializer.dtypes_dict.get(obj.dtype) - if info: - type_id = info[2] - if type_id is not None: - if cls is not None: - serializer_ = self._type_id_and_cls_to_serializer[(type_id, cls)] - else: - serializer_ = self._type_id_to_serializer[type_id] - else: - class_info = self._classes_info.get(cls) - if class_info is not None: - serializer_ = class_info.serializer - else: - self._add_serializer(cls, serializer=self.get_or_create_serializer(cls)) - serializer_ = self._classes_info.get(cls).serializer - self._serializer = serializer_ - return serializer_ - - def get_or_create_serializer(self, cls): - return self.get_or_create_classinfo(cls).serializer - - def get_or_create_classinfo(self, cls): - class_info = self._classes_info.get(cls) - if class_info is not None: - if class_info.serializer is not None: - return class_info - else: - class_info.serializer = self._create_serializer(cls) - return class_info - else: - serializer = self._create_serializer(cls) - class_id = ( - NO_CLASS_ID - if type(serializer) is not PickleSerializer - else PICKLE_CLASS_ID - ) - class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode("utf-8") - class_info = ClassInfo( - cls=cls, - class_name_bytes=class_name_bytes, - serializer=serializer, - class_id=class_id, - ) - self._classes_info[cls] = class_info - return class_info - - def _create_serializer(self, cls): - mro = cls.__mro__ - classinfo_ = self._classes_info.get(cls) - for clz in mro: - class_info = self._classes_info.get(clz) - if ( - class_info - and class_info.serializer - and class_info.serializer.support_subclass() - ): - if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: - logger.info("Class %s not registered", cls) - serializer = type(class_info.serializer)(self.fury, cls) - break - else: - if dataclasses.is_dataclass(cls): - if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: - logger.info("Class %s not registered", cls) - logger.info("Class %s not registered", cls) - from pyfury import DataClassSerializer - - serializer = DataClassSerializer(self.fury, cls) - else: - serializer = PickleSerializer(self.fury, cls) - return serializer - - def write_classinfo(self, buffer: Buffer, classinfo: ClassInfo): - class_id = classinfo.class_id - if class_id != NO_CLASS_ID: - buffer.write_varint32(class_id << 1) - return - buffer.write_varint32(1) - self.write_enum_string_bytes(buffer, classinfo.class_name_bytes) - - def read_classinfo(self, buffer): - header = buffer.read_varint32() - if header & 0b1 == 0: - class_id = header >> 1 - classinfo = self._registered_id2_class_info[class_id] - if classinfo.serializer is None: - classinfo.serializer = self._create_serializer(classinfo.cls) - return classinfo - meta_str_header = buffer.read_varint32() - length = meta_str_header >> 1 - if meta_str_header & 0b1 != 0: - return self._dynamic_id_to_classinfo_list[length - 1] - class_name_bytes_hash = buffer.read_int64() - reader_index = buffer.reader_index - buffer.check_bound(reader_index, length) - buffer.reader_index = reader_index + length - classinfo = self._hash_to_classinfo.get(class_name_bytes_hash) - if classinfo is None: - classname_bytes = buffer.get_bytes(reader_index, length) - full_class_name = classname_bytes.decode(encoding="utf-8") - cls = load_class(full_class_name) - classinfo = self.get_or_create_classinfo(cls) - self._hash_to_classinfo[class_name_bytes_hash] = classinfo - self._dynamic_id_to_classinfo_list.append(classinfo) - return classinfo - - def write_enum_string_bytes( - self, buffer: Buffer, enum_string_bytes: MetaStringBytes - ): - dynamic_write_string_id = enum_string_bytes.dynamic_write_string_id - if dynamic_write_string_id == DEFAULT_DYNAMIC_WRITE_STRING_ID: - dynamic_write_string_id = self._dynamic_write_string_id - enum_string_bytes.dynamic_write_string_id = dynamic_write_string_id - self._dynamic_write_string_id += 1 - self._dynamic_written_enum_string.append(enum_string_bytes) - buffer.write_varint32(enum_string_bytes.length << 1) - if enum_string_bytes.length <= SMALL_STRING_THRESHOLD: - # TODO(chaokunyang) support meta string encoding - buffer.write_int8(Encoding.UTF_8.value) - else: - buffer.write_int64(enum_string_bytes.hashcode) - buffer.write_bytes(enum_string_bytes.data) - else: - buffer.write_varint32(((dynamic_write_string_id + 1) << 1) | 1) - - def read_enum_string_bytes(self, buffer: Buffer) -> MetaStringBytes: - header = buffer.read_varint32() - length = header >> 1 - if header & 0b1 != 0: - return self._dynamic_id_to_enum_str_list[length - 1] - if length <= SMALL_STRING_THRESHOLD: - buffer.read_int8() - if length <= 8: - v1 = buffer.read_bytes_as_int64(length) - v2 = 0 - else: - v1 = buffer.read_int64() - v2 = buffer.read_bytes_as_int64(length - 8) - hashcode = v1 * 31 + v2 - enum_str = self._hash_to_enum_string.get(hashcode) - if enum_str is None: - str_bytes = buffer.get_bytes(buffer.reader_index - length, length) - enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) - self._hash_to_enum_string[hashcode] = enum_str - else: - hashcode = buffer.read_int64() - reader_index = buffer.reader_index - buffer.check_bound(reader_index, length) - buffer.reader_index = reader_index + length - enum_str = self._hash_to_enum_string.get(hashcode) - if enum_str is None: - str_bytes = buffer.get_bytes(reader_index, length) - enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) - self._hash_to_enum_string[hashcode] = enum_str - self._dynamic_id_to_enum_str_list.append(enum_str) - return enum_str - - def xwrite_class(self, buffer, cls): - class_name_bytes = self._classes_info[cls].class_name_bytes - self.write_enum_string_bytes(buffer, class_name_bytes) - - def xwrite_type_tag(self, buffer, cls): - type_tag_bytes = self._classes_info[cls].type_tag_bytes - self.write_enum_string_bytes(buffer, type_tag_bytes) - - def read_class_by_type_tag(self, buffer): - tag = self.xread_classname(buffer) - return self._type_tag_to_class_x_lang_map[tag] - - def xread_class(self, buffer): - class_name_bytes = self.read_enum_string_bytes(buffer) - cls = self._enum_str_to_class.get(class_name_bytes) - if cls is None: - full_class_name = class_name_bytes.data.decode(encoding="utf-8") - cls = load_class(full_class_name) - self._enum_str_to_class[class_name_bytes] = cls - return cls - - def xread_classname(self, buffer) -> str: - str_bytes = self.read_enum_string_bytes(buffer) - str_ = self._enum_str_to_str.get(str_bytes) - if str_ is None: - str_ = str_bytes.data.decode(encoding="utf-8") - self._enum_str_to_str[str_bytes] = str_ - return str_ - - def get_class_by_type_id(self, type_id: int): - return self._type_id_to_class[type_id] - - def reset(self): - self.reset_write() - self.reset_read() - - def reset_read(self): - self._dynamic_id_to_classinfo_list.clear() - self._dynamic_id_to_enum_str_list.clear() - - def reset_write(self): - if self._dynamic_write_string_id != 0: - self._dynamic_write_string_id = 0 - for enum_str in self._dynamic_written_enum_string: - enum_str.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_STRING_ID - self._dynamic_written_enum_string.clear() - - class Language(enum.Enum): XLANG = 0 JAVA = 1 @@ -689,8 +211,8 @@ def register_serializer(self, cls: type, serializer): self.class_resolver.register_serializer(cls, serializer) # `Union[type, TypeVar]` is not supported in py3.6 - def register_class(self, cls, *, class_id: int = None, type_tag: str = None): - self.class_resolver.register_class(cls, class_id=class_id, type_tag=type_tag) + def register_type(self, cls, *, class_id: int = None, type_tag: str = None): + self.class_resolver.register_type(cls, class_id=class_id, type_tag=type_tag) def serialize( self, diff --git a/python/pyfury/_registry.py b/python/pyfury/_registry.py new file mode 100644 index 0000000000..7ace2f9fc7 --- /dev/null +++ b/python/pyfury/_registry.py @@ -0,0 +1,503 @@ +import array +import dataclasses +import datetime +import enum +import logging +import sys +from typing import Dict, Tuple, TypeVar + +from pyfury.serializer import ( + Serializer, + NOT_SUPPORT_CROSS_LANGUAGE, + PickleSerializer, + Numpy1DArraySerializer, + PyArraySerializer, + PYINT_CLASS_ID, + PYFLOAT_CLASS_ID, + PYBOOL_CLASS_ID, + STRING_CLASS_ID, + PICKLE_CLASS_ID, + NO_CLASS_ID, + NoneSerializer, + _PickleStub, + PickleStrongCacheStub, + PICKLE_STRONG_CACHE_CLASS_ID, + PICKLE_CACHE_CLASS_ID, + PickleCacheStub, + SMALL_STRING_THRESHOLD, +) +from pyfury.buffer import Buffer +from pyfury.meta.metastring import Encoding +from pyfury.type import ( + FuryType, + Int8Type, + Int16Type, + Int32Type, + Int64Type, + Float32Type, + Float64Type, + load_class, +) + +try: + import numpy as np +except ImportError: + np = None + + +logger = logging.getLogger(__name__) + + +DEFAULT_DYNAMIC_WRITE_STRING_ID = -1 + + +class ClassResolver: + __slots__ = ( + "fury", + "_type_id_to_class", + "_type_id_to_serializer", + "_type_id_and_cls_to_serializer", + "_type_tag_to_class_x_lang_map", + "_enum_str_to_str", + "_class_id_counter", + "_used_classes_id", + "_classes_info", + "_registered_id2_class_info", + "_hash_to_enum_string", + "_enum_str_to_class", + "_hash_to_classinfo", + "_dynamic_id_to_classinfo_list", + "_dynamic_id_to_enum_str_list", + "_serializer", + "_dynamic_write_string_id", + "_dynamic_written_enum_string", + ) + + _type_id_to_class: Dict[int, type] + _type_id_to_serializer: Dict[int, Serializer] + _type_id_and_cls_to_serializer: Dict[Tuple[int, type], Serializer] + _classes_info: Dict[type, "ClassInfo"] + + def __init__(self, fury): + self.fury = fury + self._type_id_to_class = dict() + self._type_id_to_serializer = dict() + self._type_id_and_cls_to_serializer = dict() + self._type_tag_to_class_x_lang_map = dict() + self._class_id_counter = PICKLE_CACHE_CLASS_ID + 1 + self._used_classes_id = set() + + self._classes_info = dict() + self._registered_id2_class_info = [] + + self._enum_str_to_str = dict() + self._enum_str_to_class = dict() + self._hash_to_enum_string = dict() + self._hash_to_classinfo = dict() + self._dynamic_id_to_classinfo_list = list() + self._dynamic_id_to_enum_str_list = list() + + self._serializer = None + self._dynamic_write_string_id = 0 + self._dynamic_written_enum_string = [] + + def initialize(self): + self.register_type(int, class_id=PYINT_CLASS_ID) + self.register_type(float, class_id=PYFLOAT_CLASS_ID) + self.register_type(bool, class_id=PYBOOL_CLASS_ID) + self.register_type(str, class_id=STRING_CLASS_ID) + self.register_type(_PickleStub, class_id=PICKLE_CLASS_ID) + self.register_type( + PickleStrongCacheStub, class_id=PICKLE_STRONG_CACHE_CLASS_ID + ) + self.register_type(PickleCacheStub, class_id=PICKLE_CACHE_CLASS_ID) + self._add_default_serializers() + + # `Union[type, TypeVar]` is not supported in py3.6 + def register_serializer(self, cls, serializer): + assert isinstance(cls, (type, TypeVar)), cls + type_id = serializer.get_xtype_id() + if type_id != NOT_SUPPORT_CROSS_LANGUAGE: + self._add_x_lang_serializer(cls, serializer=serializer) + else: + self.register_type(cls) + self._classes_info[cls].serializer = serializer + + # `Union[type, TypeVar]` is not supported in py3.6 + def register_type(self, cls, *, class_id: int = None, type_tag: str = None): + """Register class with given type id or tag, if tag is not None, it will be used for + cross-language serialization.""" + if type_tag is not None: + assert class_id is None, ( + f"Type tag {type_tag} has been set already, " + f"set class id at the same time is not allowed." + ) + from pyfury._struct import ComplexObjectSerializer + + self.register_serializer( + cls, ComplexObjectSerializer(self.fury, cls, type_tag) + ) + return + classinfo = self._classes_info.get(cls) + if classinfo is None: + if isinstance(cls, TypeVar): + class_name_bytes = (cls.__module__ + "#" + cls.__name__).encode("utf-8") + else: + class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode( + "utf-8" + ) + class_id = class_id if class_id is not None else self._next_class_id() + assert class_id not in self._used_classes_id, ( + self._used_classes_id, + self._classes_info, + ) + classinfo = ClassInfo( + cls=cls, class_name_bytes=class_name_bytes, class_id=class_id + ) + self._classes_info[cls] = classinfo + if len(self._registered_id2_class_info) <= class_id: + self._registered_id2_class_info.extend( + [None] * (class_id - len(self._registered_id2_class_info) + 1) + ) + self._registered_id2_class_info[class_id] = classinfo + else: + if classinfo.class_id == NO_CLASS_ID: + class_id = class_id if class_id is not None else self._next_class_id() + assert class_id not in self._used_classes_id, ( + self._used_classes_id, + self._classes_info, + ) + classinfo.class_id = class_id + if len(self._registered_id2_class_info) <= class_id: + self._registered_id2_class_info.extend( + [None] * (class_id - len(self._registered_id2_class_info) + 1) + ) + self._registered_id2_class_info[class_id] = classinfo + else: + if class_id is not None and classinfo.class_id != class_id: + raise ValueError( + f"Inconsistent class id {class_id} vs {classinfo.class_id} " + f"for class {cls}" + ) + + def _next_class_id(self): + class_id = self._class_id_counter = self._class_id_counter + 1 + while class_id in self._used_classes_id: + class_id = self._class_id_counter = self._class_id_counter + 1 + return class_id + + def _add_serializer(self, cls: type, serializer=None, serializer_cls=None): + if serializer_cls: + serializer = serializer_cls(self.fury, cls) + self.register_serializer(cls, serializer) + + def _add_x_lang_serializer(self, cls: type, serializer=None, serializer_cls=None): + if serializer_cls: + serializer = serializer_cls(self.fury, cls) + type_id = serializer.get_xtype_id() + from pyfury._serializer import NOT_SUPPORT_CROSS_LANGUAGE + + assert type_id != NOT_SUPPORT_CROSS_LANGUAGE + self._type_id_and_cls_to_serializer[(type_id, cls)] = serializer + self.register_type(cls) + classinfo = self._classes_info[cls] + classinfo.serializer = serializer + if type_id == FuryType.FURY_TYPE_TAG.value: + type_tag = serializer.get_xtype_tag() + assert type(type_tag) is str + assert type_tag not in self._type_tag_to_class_x_lang_map + classinfo.type_tag_bytes = MetaStringBytes(type_tag.encode("utf-8")) + self._type_tag_to_class_x_lang_map[type_tag] = cls + else: + self._type_id_to_serializer[type_id] = serializer + if type_id > NOT_SUPPORT_CROSS_LANGUAGE: + self._type_id_to_class[type_id] = cls + + def _add_default_serializers(self): + import pyfury.serializer as serializers + from pyfury._serializer import PyArraySerializer, Numpy1DArraySerializer + + self._add_x_lang_serializer(int, serializer_cls=serializers.ByteSerializer) + self._add_x_lang_serializer(int, serializer_cls=serializers.Int16Serializer) + self._add_x_lang_serializer(int, serializer_cls=serializers.Int32Serializer) + self._add_x_lang_serializer(int, serializer_cls=serializers.Int64Serializer) + self._add_x_lang_serializer(float, serializer_cls=serializers.FloatSerializer) + self._add_x_lang_serializer(float, serializer_cls=serializers.DoubleSerializer) + self._add_serializer(type(None), serializer_cls=NoneSerializer) + self._add_serializer(bool, serializer_cls=serializers.BooleanSerializer) + self._add_serializer(Int8Type, serializer_cls=serializers.ByteSerializer) + self._add_serializer(Int16Type, serializer_cls=serializers.Int16Serializer) + self._add_serializer(Int32Type, serializer_cls=serializers.Int32Serializer) + self._add_serializer(Int64Type, serializer_cls=serializers.Int64Serializer) + self._add_serializer(Float32Type, serializer_cls=serializers.FloatSerializer) + self._add_serializer(Float64Type, serializer_cls=serializers.DoubleSerializer) + self._add_serializer(str, serializer_cls=serializers.StringSerializer) + self._add_serializer(datetime.date, serializer_cls=serializers.DateSerializer) + self._add_serializer( + datetime.datetime, serializer_cls=serializers.TimestampSerializer + ) + self._add_serializer(bytes, serializer_cls=serializers.BytesSerializer) + self._add_serializer(list, serializer_cls=serializers.ListSerializer) + self._add_serializer(tuple, serializer_cls=serializers.TupleSerializer) + self._add_serializer(dict, serializer_cls=serializers.MapSerializer) + self._add_serializer(set, serializer_cls=serializers.SetSerializer) + self._add_serializer(enum.Enum, serializer_cls=serializers.EnumSerializer) + self._add_serializer(slice, serializer_cls=serializers.SliceSerializer) + from pyfury import PickleCacheSerializer, PickleStrongCacheSerializer + + self._add_serializer( + PickleStrongCacheStub, serializer=PickleStrongCacheSerializer(self.fury) + ) + self._add_serializer( + PickleCacheStub, serializer=PickleCacheSerializer(self.fury) + ) + try: + import pyarrow as pa + from pyfury.format.serializer import ( + ArrowRecordBatchSerializer, + ArrowTableSerializer, + ) + + self._add_serializer( + pa.RecordBatch, serializer_cls=ArrowRecordBatchSerializer + ) + self._add_serializer(pa.Table, serializer_cls=ArrowTableSerializer) + except Exception: + pass + for typecode in PyArraySerializer.typecode_dict.keys(): + self._add_serializer( + array.array, + serializer=PyArraySerializer(self.fury, array.array, typecode), + ) + self._add_serializer( + PyArraySerializer.typecodearray_type[typecode], + serializer=PyArraySerializer(self.fury, array.array, typecode), + ) + if np: + for dtype in Numpy1DArraySerializer.dtypes_dict.keys(): + self._add_serializer( + np.ndarray, + serializer=Numpy1DArraySerializer(self.fury, array.array, dtype), + ) + + def get_serializer(self, cls: type = None, type_id: int = None, obj=None): + """ + Returns + ------- + Returns or create serializer for the provided class + """ + assert cls is not None or type_id is not None or obj is not None + if obj is not None: + cls = type(obj) + if cls is int and 2**63 - 1 >= obj >= -(2**63): + type_id = FuryType.INT64.value + elif cls is float: + type_id = FuryType.DOUBLE.value + elif cls is array.array: + info = PyArraySerializer.typecode_dict.get(obj.typecode) + if info is not None: + type_id = info[1] + elif np and cls is np.ndarray and obj.ndim == 1: + info = Numpy1DArraySerializer.dtypes_dict.get(obj.dtype) + if info: + type_id = info[2] + if type_id is not None: + if cls is not None: + serializer_ = self._type_id_and_cls_to_serializer[(type_id, cls)] + else: + serializer_ = self._type_id_to_serializer[type_id] + else: + class_info = self._classes_info.get(cls) + if class_info is not None: + serializer_ = class_info.serializer + else: + self._add_serializer(cls, serializer=self.get_or_create_serializer(cls)) + serializer_ = self._classes_info.get(cls).serializer + self._serializer = serializer_ + return serializer_ + + def get_or_create_serializer(self, cls): + return self.get_or_create_classinfo(cls).serializer + + def get_or_create_classinfo(self, cls): + class_info = self._classes_info.get(cls) + if class_info is not None: + if class_info.serializer is not None: + return class_info + else: + class_info.serializer = self._create_serializer(cls) + return class_info + else: + serializer = self._create_serializer(cls) + class_id = ( + NO_CLASS_ID + if type(serializer) is not PickleSerializer + else PICKLE_CLASS_ID + ) + class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode("utf-8") + class_info = ClassInfo( + cls=cls, + class_name_bytes=class_name_bytes, + serializer=serializer, + class_id=class_id, + ) + self._classes_info[cls] = class_info + return class_info + + def _create_serializer(self, cls): + mro = cls.__mro__ + classinfo_ = self._classes_info.get(cls) + for clz in mro: + class_info = self._classes_info.get(clz) + if ( + class_info + and class_info.serializer + and class_info.serializer.support_subclass() + ): + if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: + logger.info("Class %s not registered", cls) + serializer = type(class_info.serializer)(self.fury, cls) + break + else: + if dataclasses.is_dataclass(cls): + if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: + logger.info("Class %s not registered", cls) + logger.info("Class %s not registered", cls) + from pyfury import DataClassSerializer + + serializer = DataClassSerializer(self.fury, cls) + else: + serializer = PickleSerializer(self.fury, cls) + return serializer + + def write_classinfo(self, buffer: Buffer, classinfo: ClassInfo): + class_id = classinfo.class_id + if class_id != NO_CLASS_ID: + buffer.write_varint32(class_id << 1) + return + buffer.write_varint32(1) + self.write_enum_string_bytes(buffer, classinfo.class_name_bytes) + + def read_classinfo(self, buffer): + header = buffer.read_varint32() + if header & 0b1 == 0: + class_id = header >> 1 + classinfo = self._registered_id2_class_info[class_id] + if classinfo.serializer is None: + classinfo.serializer = self._create_serializer(classinfo.cls) + return classinfo + meta_str_header = buffer.read_varint32() + length = meta_str_header >> 1 + if meta_str_header & 0b1 != 0: + return self._dynamic_id_to_classinfo_list[length - 1] + class_name_bytes_hash = buffer.read_int64() + reader_index = buffer.reader_index + buffer.check_bound(reader_index, length) + buffer.reader_index = reader_index + length + classinfo = self._hash_to_classinfo.get(class_name_bytes_hash) + if classinfo is None: + classname_bytes = buffer.get_bytes(reader_index, length) + full_class_name = classname_bytes.decode(encoding="utf-8") + cls = load_class(full_class_name) + classinfo = self.get_or_create_classinfo(cls) + self._hash_to_classinfo[class_name_bytes_hash] = classinfo + self._dynamic_id_to_classinfo_list.append(classinfo) + return classinfo + + def write_enum_string_bytes( + self, buffer: Buffer, enum_string_bytes: MetaStringBytes + ): + dynamic_write_string_id = enum_string_bytes.dynamic_write_string_id + if dynamic_write_string_id == DEFAULT_DYNAMIC_WRITE_STRING_ID: + dynamic_write_string_id = self._dynamic_write_string_id + enum_string_bytes.dynamic_write_string_id = dynamic_write_string_id + self._dynamic_write_string_id += 1 + self._dynamic_written_enum_string.append(enum_string_bytes) + buffer.write_varint32(enum_string_bytes.length << 1) + if enum_string_bytes.length <= SMALL_STRING_THRESHOLD: + # TODO(chaokunyang) support meta string encoding + buffer.write_int8(Encoding.UTF_8.value) + else: + buffer.write_int64(enum_string_bytes.hashcode) + buffer.write_bytes(enum_string_bytes.data) + else: + buffer.write_varint32(((dynamic_write_string_id + 1) << 1) | 1) + + def read_enum_string_bytes(self, buffer: Buffer) -> MetaStringBytes: + header = buffer.read_varint32() + length = header >> 1 + if header & 0b1 != 0: + return self._dynamic_id_to_enum_str_list[length - 1] + if length <= SMALL_STRING_THRESHOLD: + buffer.read_int8() + if length <= 8: + v1 = buffer.read_bytes_as_int64(length) + v2 = 0 + else: + v1 = buffer.read_int64() + v2 = buffer.read_bytes_as_int64(length - 8) + hashcode = v1 * 31 + v2 + enum_str = self._hash_to_enum_string.get(hashcode) + if enum_str is None: + str_bytes = buffer.get_bytes(buffer.reader_index - length, length) + enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) + self._hash_to_enum_string[hashcode] = enum_str + else: + hashcode = buffer.read_int64() + reader_index = buffer.reader_index + buffer.check_bound(reader_index, length) + buffer.reader_index = reader_index + length + enum_str = self._hash_to_enum_string.get(hashcode) + if enum_str is None: + str_bytes = buffer.get_bytes(reader_index, length) + enum_str = MetaStringBytes(str_bytes, hashcode=hashcode) + self._hash_to_enum_string[hashcode] = enum_str + self._dynamic_id_to_enum_str_list.append(enum_str) + return enum_str + + def xwrite_class(self, buffer, cls): + class_name_bytes = self._classes_info[cls].class_name_bytes + self.write_enum_string_bytes(buffer, class_name_bytes) + + def xwrite_type_tag(self, buffer, cls): + type_tag_bytes = self._classes_info[cls].type_tag_bytes + self.write_enum_string_bytes(buffer, type_tag_bytes) + + def read_class_by_type_tag(self, buffer): + tag = self.xread_classname(buffer) + return self._type_tag_to_class_x_lang_map[tag] + + def xread_class(self, buffer): + class_name_bytes = self.read_enum_string_bytes(buffer) + cls = self._enum_str_to_class.get(class_name_bytes) + if cls is None: + full_class_name = class_name_bytes.data.decode(encoding="utf-8") + cls = load_class(full_class_name) + self._enum_str_to_class[class_name_bytes] = cls + return cls + + def xread_classname(self, buffer) -> str: + str_bytes = self.read_enum_string_bytes(buffer) + str_ = self._enum_str_to_str.get(str_bytes) + if str_ is None: + str_ = str_bytes.data.decode(encoding="utf-8") + self._enum_str_to_str[str_bytes] = str_ + return str_ + + def get_class_by_type_id(self, type_id: int): + return self._type_id_to_class[type_id] + + def reset(self): + self.reset_write() + self.reset_read() + + def reset_read(self): + self._dynamic_id_to_classinfo_list.clear() + self._dynamic_id_to_enum_str_list.clear() + + def reset_write(self): + if self._dynamic_write_string_id != 0: + self._dynamic_write_string_id = 0 + for enum_str in self._dynamic_written_enum_string: + enum_str.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_STRING_ID + self._dynamic_written_enum_string.clear() diff --git a/python/pyfury/_serialization.pyx b/python/pyfury/_serialization.pyx index ce1443c65f..99604169f5 100644 --- a/python/pyfury/_serialization.pyx +++ b/python/pyfury/_serialization.pyx @@ -35,11 +35,12 @@ from pyfury._fury import _PicklerStub, _UnpicklerStub, Pickler, Unpickler from pyfury._fury import _ENABLE_CLASS_REGISTRATION_FORCIBLY from pyfury.error import ClassNotCompatibleError from pyfury.lib import mmh3 -from pyfury.meta.metastring import Encoding +from pyfury.meta.metastring import Encoding, MetaStringEncoder, MetaStringDecoder from pyfury.type import is_primitive_type, FuryType, Int8Type, Int16Type, Int32Type, \ Int64Type, Float32Type, Float64Type, Int16ArrayType, Int32ArrayType, \ Int64ArrayType, Float32ArrayType, Float64ArrayType, infer_field, load_class from pyfury.util import is_little_endian +from pyfury.includes.libserialization cimport TypeId, IsNamespacedType from libc.stdint cimport * from libcpp.vector cimport vector @@ -309,7 +310,7 @@ cdef class ClassResolver: dict _type_id_to_serializer # Dict[int, Serializer] dict _type_id_and_cls_to_serializer # Dict[Tuple[int, type], Serializer] dict _type_tag_to_class_x_lang_map - int16_t _class_id_counter + int16_t _type_id_counter set _used_classes_id public list _registered_id2_class_info @@ -319,6 +320,7 @@ cdef class ClassResolver: flat_hash_map[uint64_t, PyObject*] _c_classes_info # hash -> ClassInfo flat_hash_map[int64_t, PyObject*] _c_hash_to_classinfo + flat_hash_map[pair[int64_t, int64_t], PyObject*] _c_2hash_to_classinfo # hash -> MetaStringBytes flat_hash_map[int64_t, PyObject*] _c_hash_to_enum_string_bytes flat_hash_map[pair[int64_t, int64_t], PyObject*] _c_hash_to_small_metastring_bytes @@ -346,139 +348,50 @@ cdef class ClassResolver: self._type_id_to_serializer = dict() self._type_id_and_cls_to_serializer = dict() self._type_tag_to_class_x_lang_map = dict() - self._class_id_counter = PICKLE_CACHE_CLASS_ID + 1 + self._type_id_counter = PICKLE_CACHE_CLASS_ID + 1 self._used_classes_id = set() self._registered_id2_class_info = list() - self.dynamic_write_string_id = 0 - self._serializer = None - self._classes_info = dict() self._class_set = set() self._classname_set = set() self._enum_str_set = set() - + self._generic_encoder = MetaStringEncoder('.', '_') + self._generic_decoder = MetaStringDecoder('.', '_') + self._typename_decoder = MetaStringDecoder('$', '_') + self._typename_decoder = MetaStringDecoder('$', '_') + +# if classinfo.dynamic_type: + # if cls is int and 2**63 - 1 >= obj >= -(2**63): + # type_id = FuryType.INT64.value + # elif cls is float: + # type_id = FuryType.DOUBLE.value + # elif cls is array.array: + # info = PyArraySerializer.typecode_dict.get(obj.typecode) + # if info is not None: + # type_id = info[1] + # elif np and cls is np.ndarray and obj.ndim == 1: + # info = Numpy1DArraySerializer.dtypes_dict.get(obj.dtype) + # if info: + # type_id = info[2] + # //???? + # self._add_serializer(cls, serializer=self.get_or_create_serializer(cls)) + # serializer_ = self._classes_info.get(cls).serializer + # return serializer_ def initialize(self): - self.register_class(int, class_id=PYINT_CLASS_ID) - self.register_class(float, class_id=PYFLOAT_CLASS_ID) - self.register_class(bool, class_id=PYBOOL_CLASS_ID) - self.register_class(str, class_id=STRING_CLASS_ID) - self.register_class(_PickleStub, class_id=PICKLE_CLASS_ID) - self.register_class(PickleStrongCacheStub, class_id=PICKLE_STRONG_CACHE_CLASS_ID) - self.register_class(PickleCacheStub, class_id=PICKLE_CACHE_CLASS_ID) - self._add_default_serializers() - - def register_serializer(self, cls: Union[type, TypeVar], serializer): - assert isinstance(cls, (type, TypeVar)), cls - type_id = serializer.get_xtype_id() - if type_id != NOT_SUPPORT_CROSS_LANGUAGE: - self._add_x_lang_serializer(cls, serializer=serializer) - else: - self.register_class(cls) - self._classes_info[cls].serializer = serializer - - def register_class(self, cls: Union[type, TypeVar], *, class_id: int = None, type_tag: str = None): - """Register class with given type id or tag, if tag is not None, it will be used for - cross-language serialization.""" - if type_tag is not None: - assert class_id is None, (f"Type tag {type_tag} has been set already, " - f"set class id at the same time is not allowed.") - self.register_serializer( - cls, ComplexObjectSerializer(self.fury, cls, type_tag) - ) - return - classinfo = self._classes_info.get(cls) - if classinfo is None: - if isinstance(cls, TypeVar): - class_name_bytes = (cls.__module__ + "#" + cls.__name__).encode("utf-8") - else: - class_name_bytes = (cls.__module__ + "#" + cls.__qualname__) \ - .encode("utf-8") - class_id = class_id if class_id is not None else self._next_class_id() - assert class_id not in self._used_classes_id, ( - self._used_classes_id, - self._classes_info, - ) - classinfo = ClassInfo( - cls=cls, class_name_bytes=class_name_bytes, class_id=class_id - ) - self._classes_info[cls] = classinfo - self._c_classes_info[cls] = classinfo - if len(self._registered_id2_class_info) <= class_id: - self._registered_id2_class_info.extend( - [None] * (class_id - len(self._registered_id2_class_info) + 1) - ) - self._registered_id2_class_info[class_id] = classinfo - self._c_registered_id2_class_info.resize(class_id + 1) - self._c_registered_id2_class_info[class_id] = classinfo - else: - if classinfo.class_id == NO_CLASS_ID: - class_id = class_id if class_id is not None else self._next_class_id() - assert class_id not in self._used_classes_id, ( - self._used_classes_id, - self._classes_info, - ) - classinfo.class_id = class_id - if len(self._registered_id2_class_info) <= class_id: - self._registered_id2_class_info.extend( - [None] * (class_id - len(self._registered_id2_class_info) + 1) - ) - self._registered_id2_class_info[class_id] = classinfo - self._c_registered_id2_class_info.resize(class_id + 1) - self._c_registered_id2_class_info[class_id] = classinfo - else: - if class_id is not None and classinfo.class_id != class_id: - raise ValueError( - f"Inconsistent class id {class_id} vs {classinfo.class_id} " - f"for class {cls}" - ) - - def _next_class_id(self): - class_id = self._class_id_counter = self._class_id_counter + 1 - while class_id in self._used_classes_id: - class_id = self._class_id_counter = self._class_id_counter + 1 - return class_id - - def _add_serializer( - self, - cls: Union[type, TypeVar], - serializer=None, - serializer_cls=None): - if serializer_cls: - serializer = serializer_cls(self.fury, cls) - self.register_serializer(cls, serializer) - - def _add_x_lang_serializer(self, - cls: Union[type, TypeVar], - serializer=None, - serializer_cls=None): - if serializer_cls: - serializer = serializer_cls(self.fury, cls) - type_id = serializer.get_xtype_id() - - assert type_id != NOT_SUPPORT_CROSS_LANGUAGE - self._type_id_and_cls_to_serializer[(type_id, cls)] = serializer - self.register_class(cls) - classinfo = self._classes_info[cls] - classinfo.serializer = serializer - if type_id == FuryType.FURY_TYPE_TAG.value: - type_tag = serializer.get_xtype_tag() - assert type(type_tag) is str - assert type_tag not in self._type_tag_to_class_x_lang_map - classinfo.type_tag_bytes = MetaStringBytes(type_tag.encode("utf-8")) - self._type_tag_to_class_x_lang_map[type_tag] = cls + if self.fury.language == Language.PYTHON: + self._initialize_py() else: - self._type_id_to_serializer[type_id] = serializer - if type_id > NOT_SUPPORT_CROSS_LANGUAGE: - self._type_id_to_class[type_id] = cls - - def _add_default_serializers(self): - self._add_x_lang_serializer(int, serializer_cls=ByteSerializer) - self._add_x_lang_serializer(int, serializer_cls=Int16Serializer) - self._add_x_lang_serializer(int, serializer_cls=Int32Serializer) - self._add_x_lang_serializer(int, serializer_cls=Int64Serializer) - self._add_x_lang_serializer(float, serializer_cls=FloatSerializer) - self._add_x_lang_serializer(float, serializer_cls=DoubleSerializer) + self._initialize_xlang() + + def _initialize_py(self): + self.register_type(int, type_id=PYINT_CLASS_ID) + self.register_type(float, type_id=PYFLOAT_CLASS_ID) + self.register_type(bool, type_id=PYBOOL_CLASS_ID) + self.register_type(str, type_id=STRING_CLASS_ID) + self.register_type(_PickleStub, type_id=PICKLE_CLASS_ID) + self.register_type(PickleStrongCacheStub, type_id=PICKLE_STRONG_CACHE_CLASS_ID) + self.register_type(PickleCacheStub, type_id=PICKLE_CACHE_CLASS_ID) self._add_serializer(type(None), serializer_cls=NoneSerializer) self._add_serializer(bool, serializer_cls=BooleanSerializer) self._add_serializer(Int8Type, serializer_cls=ByteSerializer) @@ -524,54 +437,199 @@ cdef class ClassResolver: array.array, serializer=PyArraySerializer(self.fury, array.array, typecode), ) - self._add_serializer( - PyArraySerializer.typecodearray_type[typecode], + self._add_serializer(array.array, DynamicPyArraySerializer) + if np: + self._add_serializer(np.ndarray, NDArraySerializer) + + def _initialize_xlang(self): + self.register_type(bool, type_id=FuryType.BOOL, serializer=BooleanSerializer(self.fury, bool)) + self.register_type(Int8Type, type_id=FuryType.INT8, serializer=ByteSerializer(self.fury, int)) + self.register_type(Int16Type, type_id=FuryType.INT16, serializer=Int16Serializer(self.fury, int)) + self.register_type(Int32Type, type_id=FuryType.INT32, serializer=Int32Serializer(self.fury, int)) + self.register_type(Int64Type, type_id=FuryType.INT64, serializer=Int64Serializer(self.fury, int)) + self.register_type(int, type_id=0, serializer=DynamicIntSerializer(self.fury, int)) + self.register_type(Float32Type, type_id=FuryType.FLOAT32, serializer=FloatSerializer(self.fury, float)) + self.register_type(Float64Type, type_id=FuryType.FLOAT64, serializer=FloatSerializer(self.fury, float)) + self.register_type(float, type_id=0, serializer=DynamicFloatSerializer(self.fury, float)) + self.register_type(str, type_id=FuryType.STRING, serializer=StringSerializer(self.fury, str)) + # TODO(chaokunyang) DURATION DECIMAL + self.register_type(datetime.datetime, type_id=FuryType.TIMESTAMP, serializer=TimestampSerializer(self.fury, datetime.datetime)) + self.register_type(datetime.date, type_id=FuryType.LOCAL_DATE, serializer=DateSerializer(self.fury, datetime.date)) + self.register_type(bytes, type_id=FuryType.BINARY, serializer=BytesSerializer(self.fury, bytes)) + for itemsize, ftype, typeid in PyArraySerializer.typecode_dict.values(): + self.register_type( + ftype, type_id=typeid, serializer=PyArraySerializer(self.fury, array.array, typecode), ) + self._add_serializer(array.array, serializer=DynamicPyArraySerializer) if np: - for dtype in Numpy1DArraySerializer.dtypes_dict.keys(): + for itemsize, format, ftype, typeid in Numpy1DArraySerializer.dtypes_dict.values(): self._add_serializer( np.ndarray, - serializer=Numpy1DArraySerializer(self.fury, array.array, dtype), + serializer=Numpy1DArraySerializer(self.fury, np.ndarray, dtype), + ) + self._add_serializer(np.ndarray, serializer=NDArraySerializer) + self.register_type(list, type_id=FuryType.LIST, serializer=ListSerializer) + self.register_type(set, type_id=FuryType.SET, serializer=SetSerializer) + self.register_type(dict, type_id=FuryType.MAP, serializer=MapSerializer) + + def register_type(self, cls: Union[type, TypeVar], *, type_id: int = None, + namespace: str = None, typename: str = None, serializer=None): + """Register class with given type id or typename. If typename is not None, it will be used for + cross-language serialization.""" + n_params = len(set(typename, type_id)) + if n_params == 0: + type_id = self._next_type_id() + if ln_params == 2: + raise TypeError(f"type name {typename} and id {type_id} should not be set at the same time") + type_id_registered = type_id is not None and + if type_id is not None: + if type_id in self._type_id_to_class: + raise TypeError(f"{cls} registered already") + elif cls in self._classes_info: + raise TypeError(f"{cls} registered already") + + def register_serializer(self, cls: Union[type, TypeVar], serializer): + assert isinstance(cls, (type, TypeVar)), cls + if cls not in self._classes_info: + raise TypeUnregisteredError(f"{cls} not registered") + classinfo = self._classes_info[cls] + type_id = prev_type_id = classinfo.type_id + self._type_id_to_class.pop(prev_type_id) + if type(classinfo.serializer) is not type(serializer): + if classinfo.typename_bytes is not None: + type_id = classinfo.type_id & 0xffffff00 | FuryType.NS_EXT.value + else: + type_id = classinfo.type_id & 0xffffff00 | FuryType.EXT.value + self._type_id_to_class[type_id] = classinfo + + def _register_xtype(self, cls: Union[type, TypeVar], *, type_id: int = None, + namespace: str = None, typename: str = None, serializer=None, internal=False): + if serializer is None: + if issubclass(cls, enum.Enum): + serializer = EnumSerializer(self.fury, cls) + type_id = FuryType.NS_ENUM if type_id is None else (type_id << 8 + FuryType.ENUM) + else: + serializer = ComplexObjectSerializer(self.fury, cls) + type_id = FuryType.NS_STRUCT if type_id is None else (type_id << 8 + FuryType.STRUCT) + elif not internal: + type_id = FuryType.NS_EXT if type_id is None else (type_id << 8 + FuryType.EXT) + if typename is None: + classinfo = ClassInfo(cls, type_id, serializer, None, None) + else: + classinfo = ClassInfo(cls, type_id, serializer, None, None) + classinfo = self._classes_info[cls] + classinfo.serializer = serializer + if type_id == FuryType.FURY_TYPE_TAG.value: + type_tag = serializer.get_xtype_tag() + assert type(type_tag) is str + assert type_tag not in self._type_tag_to_class_x_lang_map + classinfo.type_tag_bytes = MetaStringBytes(type_tag.encode("utf-8")) + self._type_tag_to_class_x_lang_map[type_tag] = cls + else: + self._type_id_to_serializer[type_id] = serializer + if type_id > NOT_SUPPORT_CROSS_LANGUAGE: + self._type_id_to_class[type_id] = cls + + + def _register_type(self, cls: Union[type, TypeVar], *, type_id: int = None, + namespace: str = None, typename: str = None): + if typename is not None: + assert type_id is None, (f"Type tag {typename} has been set already, " + f"set class id at the same time is not allowed.") + self.register_serializer( + cls, ComplexObjectSerializer(self.fury, cls, type_tag) + ) + return + classinfo = self._classes_info.get(cls) + if classinfo is None: + if isinstance(cls, TypeVar): + class_name_bytes = (cls.__module__ + "#" + cls.__name__).encode("utf-8") + else: + class_name_bytes = (cls.__module__ + "#" + cls.__qualname__) \ + .encode("utf-8") + type_id = type_id if type_id is not None else self._next_type_id() + assert type_id not in self._used_classes_id, ( + self._used_classes_id, + self._classes_info, + ) + classinfo = ClassInfo( + cls=cls, class_name_bytes=class_name_bytes, type_id=type_id + ) + self._classes_info[cls] = classinfo + self._c_classes_info[cls] = classinfo + if len(self._registered_id2_class_info) <= type_id: + self._registered_id2_class_info.extend( + [None] * (type_id - len(self._registered_id2_class_info) + 1) + ) + self._registered_id2_class_info[type_id] = classinfo + self._c_registered_id2_class_info.resize(type_id + 1) + self._c_registered_id2_class_info[type_id] = classinfo + else: + if classinfo.type_id == NO_CLASS_ID: + type_id = type_id if type_id is not None else self._next_type_id() + assert type_id not in self._used_classes_id, ( + self._used_classes_id, + self._classes_info, ) + classinfo.type_id = type_id + if len(self._registered_id2_class_info) <= type_id: + self._registered_id2_class_info.extend( + [None] * (type_id - len(self._registered_id2_class_info) + 1) + ) + self._registered_id2_class_info[type_id] = classinfo + self._c_registered_id2_class_info.resize(type_id + 1) + self._c_registered_id2_class_info[type_id] = classinfo + else: + if type_id is not None and classinfo.type_id != type_id: + raise ValueError( + f"Inconsistent class id {type_id} vs {classinfo.type_id} " + f"for class {cls}" + ) + + def _next_type_id(self): + type_id = self._type_id_counter = self._type_id_counter + 1 + while type_id in self._used_classes_id: + type_id = self._type_id_counter = self._type_id_counter + 1 + return type_id - cpdef inline Serializer get_serializer(self, cls=None, type_id=None, obj=None): + def _add_serializer( + self, + cls: Union[type, TypeVar], + serializer=None, + serializer_cls=None): + if serializer_cls: + serializer = serializer_cls(self.fury, cls) + self.register_serializer(cls, serializer) + + cpdef inline Serializer get_serializer(self, cls=None, obj=None): """ Returns ------- Returns or create serializer for the provided class """ - assert cls is not None or type_id is not None or obj is not None + return get_classinfo(cls=cls, obj=obj).serializer - cdef Serializer serializer_ - if obj is not None: + cpdef inline ClassInfo get_classinfo(self, cls=None, obj=None): + if cls is None: cls = type(obj) - if cls is int and 2**63 - 1 >= obj >= -(2**63): - type_id = FuryType.INT64.value - elif cls is float: - type_id = FuryType.DOUBLE.value - elif cls is array.array: - info = PyArraySerializer.typecode_dict.get(obj.typecode) - if info is not None: - type_id = info[1] - elif np and cls is np.ndarray and obj.ndim == 1: - info = Numpy1DArraySerializer.dtypes_dict.get(obj.dtype) - if info: - type_id = info[2] - if type_id is not None: - if cls is not None: - serializer_ = self._type_id_and_cls_to_serializer[(type_id, cls)] - else: - serializer_ = self._type_id_to_serializer[type_id] - else: - class_info = self._classes_info.get(cls) - if class_info is not None: - serializer_ = class_info.serializer - else: - self._add_serializer(cls, serializer=self.get_or_create_serializer(cls)) - serializer_ = self._classes_info.get(cls).serializer - self._serializer = serializer_ - return serializer_ + cdef PyObject* py_type_ptr = cls + cdef PyObject* typeinfo_ptr = self._py_type_ptr_to_typeinfo[py_type_ptr] + if typeinfo_ptr is NULL: + typeinfo_ptr = self._init_classinfo(cls) + # If dynamic_type is true, the serializer will be a dynamic typed serializer + # and it will write type info when writing the data. + # In such cases, the `write_classinfo` should not write typeinfo. + # In general, if we have 4 type for one class, we will have 5 serializers. + # For example, we have int8/16/32/64/128 for python `int` type, then we have 6 serializers + # for python `int`: `Int8/1632/64/128Serializer` for `int8/16/32/64/128` each, and another + # `IntSerializer` for `int` which will dispatch to different `int8/16/32/64/128` type + # according the actual value. + # We do not get the acutal type here, because it will introduce extra computing. + # For example, we have want to get actual `Int8/16/32/64Serializer`, we must check and + # extract the actutal here which will introduce cost, and we will do same thing again + # when serializing the actual data. + return typeinfo_ptr cpdef inline Serializer get_or_create_serializer(self, cls): return self.get_or_create_classinfo(cls).serializer @@ -589,13 +647,13 @@ cdef class ClassResolver: else: serializer = self._create_serializer(cls) if type(serializer) is PickleSerializer: - class_id = PICKLE_CLASS_ID + type_id = PICKLE_CLASS_ID else: - class_id = NO_CLASS_ID + type_id = NO_CLASS_ID class_name_bytes = (cls.__module__ + "#" + cls.__qualname__).encode("utf-8") class_info = ClassInfo( cls=cls, class_name_bytes=class_name_bytes, - serializer=serializer, class_id=class_id + serializer=serializer, type_id=type_id ) self._classes_info[cls] = class_info self._c_classes_info[cls] = class_info @@ -611,13 +669,13 @@ cdef class ClassResolver: and class_info.serializer and class_info.serializer.support_subclass() ): - if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: + if classinfo_ is None or classinfo_.type_id == NO_CLASS_ID: logger.info("Class %s not registered", cls) serializer = type(class_info.serializer)(self.fury, cls) break else: if dataclasses.is_dataclass(cls): - if classinfo_ is None or classinfo_.class_id == NO_CLASS_ID: + if classinfo_ is None or classinfo_.type_id == NO_CLASS_ID: logger.info("Class %s not registered", cls) from pyfury import DataClassSerializer @@ -627,24 +685,51 @@ cdef class ClassResolver: return serializer cpdef inline write_classinfo(self, Buffer buffer, ClassInfo classinfo): - cdef int32_t class_id = classinfo.class_id - if class_id != NO_CLASS_ID: - buffer.write_varint32((class_id << 1)) + cdef int32_t type_id = classinfo.type_id + if type_id != NO_CLASS_ID: + buffer.write_varint32((type_id << 1)) return buffer.write_varint32(1) self._write_enum_string_bytes(buffer, classinfo.class_name_bytes) + cpdef inline ClassInfo read_typeinfo(self, Buffer buffer): + cdef int32_t type_id = buffer.read_varuint32() + assert type_id != 0 + cdef MetaStringBytes namespace_bytes, typename_bytes + if IsNamespacedType(type_id): + namespace_bytes = self.class_resolver._read_enum_string_bytes(buffer) + typename_bytes = self.class_resolver._read_enum_string_bytes(buffer) + return self._load_bytes_to_classinfo(type_id, namespace_bytes, typename_bytes) + return self.xtype_id_to_class_map.get(xtypeId) + + cdef inline ClassInfo _load_bytes_to_classinfo( + self, int32_t type_id, MetaStringBytes namespace_bytes, MetaStringBytes typename_bytes): + cdef PyObject* classinfo_ptr = self._c_2hash_to_classinfo[ + pair[int64_t, int64_t](namespace_bytes.hashcode, typename_bytes.hashcode)] + if classinfo_ptr != NULL: + return classinfo_ptr + namespace = namespace_bytes.decode(PACKAGE_DECODER) + typename = typename_bytes.decode(TYPE_NAME_DECODER) + qualified_name = ".".join((namespace, typename)) + classinfo = self._qualified_name_to_class_info.get(qualified_name) + if classinfo is None: + raise TypeUnregisteredError(f"{qualified_name} not registered") + classinfo_ptr = classinfo + self._c_2hash_to_classinfo[pair[int64_t, int64_t]( + namespace_bytes.hashcode, typename_bytes.hashcode)] = classinfo_ptr + return classinfo + cpdef inline ClassInfo read_classinfo(self, Buffer buffer): cdef int32_t h1 = buffer.read_varint32() - cdef int32_t class_id = h1 >> 1 + cdef int32_t type_id = h1 >> 1 cdef ClassInfo classinfo cdef PyObject* classinfo_ptr # registered class id are greater than `NO_CLASS_ID`. if h1 & 0b1 == 0: - assert class_id >= 0, class_id - classinfo_ptr = self._c_registered_id2_class_info[class_id] + assert type_id >= 0, type_id + classinfo_ptr = self._c_registered_id2_class_info[type_id] if classinfo_ptr == NULL: - raise ValueError(f"Unexpected class_id {class_id} " + raise ValueError(f"Unexpected type_id {type_id} " f"{self._registered_id2_class_info}") classinfo = classinfo_ptr if classinfo.serializer is None: @@ -673,11 +758,11 @@ cdef class ClassResolver: cdef inline _write_enum_string_bytes( self, Buffer buffer, MetaStringBytes enum_string_bytes): - cdef int16_t dynamic_class_id = enum_string_bytes.dynamic_write_string_id + cdef int16_t dynamic_type_id = enum_string_bytes.dynamic_write_string_id cdef int32_t length = enum_string_bytes.length - if dynamic_class_id == DEFAULT_DYNAMIC_WRITE_STRING_ID: - dynamic_class_id = self.dynamic_write_string_id - enum_string_bytes.dynamic_write_string_id = dynamic_class_id + if dynamic_type_id == DEFAULT_DYNAMIC_WRITE_STRING_ID: + dynamic_type_id = self.dynamic_write_string_id + enum_string_bytes.dynamic_write_string_id = dynamic_type_id self.dynamic_write_string_id += 1 self._c_dynamic_written_enum_string.push_back(enum_string_bytes) buffer.write_varint32(length << 1) @@ -687,7 +772,7 @@ cdef class ClassResolver: buffer.write_int64(enum_string_bytes.hashcode) buffer.write_bytes(enum_string_bytes.data) else: - buffer.write_varint32(((dynamic_class_id + 1) << 1) | 1) + buffer.write_varint32(((dynamic_type_id + 1) << 1) | 1) cdef inline MetaStringBytes _read_enum_string_bytes(self, Buffer buffer): cdef int32_t header = buffer.read_varint32() @@ -729,45 +814,6 @@ cdef class ClassResolver: self._c_dynamic_id_to_enum_string_vec.push_back(enum_str_ptr) return enum_str_ptr - cpdef inline xwrite_class(self, Buffer buffer, cls): - cdef PyObject* classinfo_ptr = self._c_classes_info[cls] - assert classinfo_ptr != NULL - cdef MetaStringBytes class_name_bytes = (classinfo_ptr).class_name_bytes - self._write_enum_string_bytes(buffer, class_name_bytes) - - cpdef inline xwrite_type_tag(self, Buffer buffer, cls): - cdef PyObject* classinfo_ptr = self._c_classes_info[cls] - assert classinfo_ptr != NULL - cdef MetaStringBytes type_tag_bytes = (classinfo_ptr).type_tag_bytes - self._write_enum_string_bytes(buffer, type_tag_bytes) - - cpdef inline read_class_by_type_tag(self, Buffer buffer): - tag = self.xread_classname(buffer) - return self._type_tag_to_class_x_lang_map[tag] - - cpdef inline xread_class(self, Buffer buffer): - cdef MetaStringBytes str_bytes = self._read_enum_string_bytes(buffer) - cdef uint64_t object_id = str_bytes - cdef PyObject* cls_ptr = self._c_str_bytes_to_class[object_id] - if cls_ptr != NULL: - return cls_ptr - cdef str full_class_name = str_bytes.data.decode(encoding="utf-8") - cls = load_class(full_class_name) - self._c_str_bytes_to_class[object_id] = cls - self._class_set.add(cls) - return cls - - cpdef inline str xread_classname(self, Buffer buffer): - cdef MetaStringBytes str_bytes = self._read_enum_string_bytes(buffer) - cdef uint64_t object_id = str_bytes - cdef PyObject* classname_ptr = self._c_enum_str_to_str[object_id] - if classname_ptr != NULL: - return classname_ptr - cdef str full_class_name = str_bytes.data.decode(encoding="utf-8") - self._c_enum_str_to_str[object_id] = full_class_name - self._classname_set.add(full_class_name) - return full_class_name - cpdef inline get_class_by_type_id(self, int32_t type_id): return self._type_id_to_class[type_id] @@ -817,30 +863,28 @@ cdef class MetaStringBytes: @cython.final cdef class ClassInfo: cdef public object cls - cdef public int16_t class_id + cdef public int16_t type_id cdef public Serializer serializer - cdef public MetaStringBytes class_name_bytes - cdef public MetaStringBytes type_tag_bytes + cdef public MetaStringBytes namespace_bytes + cdef public MetaStringBytes typename_bytes + cdef c_bool dynamic_type def __init__( self, cls: Union[type, TypeVar] = None, - class_id: int = NO_CLASS_ID, + type_id: int = NO_CLASS_ID, serializer: Serializer = None, - class_name_bytes: bytes = None, - type_tag_bytes: bytes = None, + namespace_bytes: bytes = None, + typename_bytes: bytes = None, ): self.cls = cls - self.class_id = class_id + self.type_id = type_id self.serializer = serializer - self.class_name_bytes = MetaStringBytes(class_name_bytes) - if type_tag_bytes is None: - self.type_tag_bytes = None - else: - self.type_tag_bytes = MetaStringBytes(type_tag_bytes) + self.namespace_bytes = MetaStringBytes(namespace_bytes) + self.type_tag_bytes = MetaStringBytes(typename_bytes) def __repr__(self): - return f"ClassInfo(cls={self.cls}, class_id={self.class_id}, " \ + return f"ClassInfo(cls={self.cls}, type_id={self.type_id}, " \ f"serializer={self.serializer})" @@ -910,9 +954,9 @@ cdef class Fury: def register_serializer(self, cls: Union[type, TypeVar], Serializer serializer): self.class_resolver.register_serializer(cls, serializer) - def register_class(self, cls: Union[type, TypeVar], *, - class_id: int = None, type_tag: str = None): - self.class_resolver.register_class(cls, class_id=class_id, type_tag=type_tag) + def register_type(self, cls: Union[type, TypeVar], *, + type_id: int = None, type_tag: str = None): + self.class_resolver.register_type(cls, type_id=type_id, type_tag=type_tag) def serialize( self, obj, @@ -970,21 +1014,7 @@ cdef class Fury: if self.language == Language.PYTHON: self.serialize_ref(buffer, obj) else: - start_offset = buffer.writer_index - # preserve 4-byte for nativeObjects start offsets. - buffer.write_int32(-1) - # preserve 4-byte for nativeObjects size - buffer.write_int32(-1) self.xserialize_ref(buffer, obj) - buffer.put_int32(start_offset, buffer.writer_index) - buffer.put_int32(start_offset + 4, len(self._native_objects)) - self.ref_resolver.reset_write() - # fury write opaque object classname which cause later write of classname - # only write an id. - self.class_resolver.reset_write() - for native_object in self._native_objects: - self.serialize_ref(buffer, native_object) - self.reset_write() if buffer is not self.buffer: return buffer else: @@ -1056,27 +1086,8 @@ cdef class Fury: cpdef inline xserialize_nonref( self, Buffer buffer, obj, Serializer serializer=None): - cls = type(obj) - serializer = serializer or self.class_resolver.get_serializer(obj=obj) - cdef int16_t type_id = serializer.get_xtype_id() - buffer.write_int16(type_id) - if type_id != NOT_SUPPORT_CROSS_LANGUAGE: - if type_id == FuryType.FURY_TYPE_TAG.value: - self.class_resolver.xwrite_type_tag(buffer, cls) - if type_id < NOT_SUPPORT_CROSS_LANGUAGE: - self.class_resolver.xwrite_class(buffer, cls) - serializer.xwrite(buffer, obj) - else: - # Write classname so it can be used for debugging which object doesn't - # support cross-language. - # TODO add a config to disable this to reduce space cost. - self.class_resolver.xwrite_class(buffer, cls) - # serializer may increase reference id multi times internally, thus peer - # cross-language later fields/objects deserialization will use wrong - # reference id since we skip opaque objects deserialization. - # So we stash native objects and serialize all those object at the last. - buffer.write_varint32(len(self._native_objects)) - self._native_objects.append(obj) + serializer = serializer or self.class_resolver.write_typeinfo(obj) + serializer.xwrite(buffer, obj) def deserialize( self, @@ -1182,20 +1193,18 @@ cdef class Fury: return buffer.read_double() return classinfo.serializer.read(buffer) - cpdef inline xdeserialize_ref( - self, Buffer buffer, Serializer serializer=None): + cpdef inline xdeserialize_ref(self, Buffer buffer, Serializer serializer=None): cdef MapRefResolver ref_resolver - cdef int32_t red_id + cdef int32_t ref_id if serializer is None or serializer.need_to_write_ref: ref_resolver = self.ref_resolver - red_id = ref_resolver.try_preserve_ref_id(buffer) - + ref_id = ref_resolver.try_preserve_ref_id(buffer) # indicates that the object is first read. - if red_id >= NOT_NULL_VALUE_FLAG: + if ref_id >= NOT_NULL_VALUE_FLAG: o = self.xdeserialize_nonref( buffer, serializer=serializer ) - ref_resolver.set_read_object(red_id, o) + ref_resolver.set_read_object(ref_id, o) return o else: return ref_resolver.get_read_object() @@ -1208,37 +1217,9 @@ cdef class Fury: cpdef inline xdeserialize_nonref( self, Buffer buffer, Serializer serializer=None): - cdef int16_t type_id = buffer.read_int16() - cls = None - if type_id != NOT_SUPPORT_CROSS_LANGUAGE: - if type_id == FuryType.FURY_TYPE_TAG.value: - cls = self.class_resolver.read_class_by_type_tag(buffer) - if type_id < NOT_SUPPORT_CROSS_LANGUAGE: - if self._peer_language is not Language.PYTHON: - self.class_resolver.xread_classname(buffer) - cls = self.class_resolver.get_class_by_type_id(-type_id) - serializer = serializer or self.class_resolver.get_serializer( - type_id=-type_id - ) - else: - cls = self.class_resolver.xread_class(buffer) - serializer = serializer or self.class_resolver.get_serializer( - cls=cls, type_id=type_id - ) - else: - if type_id != FuryType.FURY_TYPE_TAG.value: - cls = self.class_resolver.get_class_by_type_id(type_id) - serializer = serializer or self.class_resolver.get_serializer( - cls=cls, type_id=type_id - ) - assert cls is not None - return serializer.xread(buffer) - cdef str class_name = self.class_resolver.xread_classname(buffer) - cdef int32_t ordinal = buffer.read_varint32() - if self._peer_language != Language.PYTHON: - return OpaqueObject(self._peer_language, class_name, ordinal) - else: - return self._native_objects[ordinal] + if serializer is None: + serializer = self.class_resolver.read_typeinfo(buffer).serializer + return serializer.xread(buffer) cpdef inline write_buffer_object(self, Buffer buffer, BufferObject buffer_object): if self._buffer_callback is not None and self._buffer_callback(buffer_object): @@ -1409,28 +1390,6 @@ cdef class Serializer: self.type_ = type_ self.need_to_write_ref = not is_primitive_type(type_) - cpdef int16_t get_xtype_id(self): - """ - Returns - ------- - Returns NOT_SUPPORT_CROSS_LANGUAGE if the serializer doesn't - support cross-language serialization. - Return a number in range (0, 32767) if the serializer support - cross-language serialization and native serialization data is the - same with cross-language serialization. - Return a negative short in range [-32768, 0) if the serializer - support cross-language serialization and native serialization data - is not the same with cross-language serialization. - """ - return NOT_SUPPORT_CROSS_LANGUAGE - - cpdef str get_xtype_tag(self): - """ - Returns - ------- - a type tag used for setup type mapping between languages. - """ - cpdef write(self, Buffer buffer, value): raise NotImplementedError @@ -1467,9 +1426,6 @@ cdef class CrossLanguageCompatibleSerializer(Serializer): @cython.final cdef class BooleanSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.BOOL.value - cpdef inline write(self, Buffer buffer, value): buffer.write_bool(value) @@ -1495,9 +1451,6 @@ cdef class NoneSerializer(Serializer): @cython.final cdef class ByteSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.INT8.value - cpdef inline write(self, Buffer buffer, value): buffer.write_int8(value) @@ -1507,9 +1460,6 @@ cdef class ByteSerializer(CrossLanguageCompatibleSerializer): @cython.final cdef class Int16Serializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.INT16.value - cpdef inline write(self, Buffer buffer, value): buffer.write_int16(value) @@ -1519,9 +1469,6 @@ cdef class Int16Serializer(CrossLanguageCompatibleSerializer): @cython.final cdef class Int32Serializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.INT32.value - cpdef inline write(self, Buffer buffer, value): buffer.write_int32(value) @@ -1531,9 +1478,6 @@ cdef class Int32Serializer(CrossLanguageCompatibleSerializer): @cython.final cdef class Int64Serializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.INT64.value - cpdef inline xwrite(self, Buffer buffer, value): buffer.write_int64(value) @@ -1549,9 +1493,6 @@ cdef class Int64Serializer(CrossLanguageCompatibleSerializer): @cython.final cdef class FloatSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.FLOAT.value - cpdef inline write(self, Buffer buffer, value): buffer.write_float(value) @@ -1561,9 +1502,6 @@ cdef class FloatSerializer(CrossLanguageCompatibleSerializer): @cython.final cdef class DoubleSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.DOUBLE.value - cpdef inline write(self, Buffer buffer, value): buffer.write_double(value) @@ -1573,9 +1511,6 @@ cdef class DoubleSerializer(CrossLanguageCompatibleSerializer): @cython.final cdef class StringSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.STRING.value - cpdef inline write(self, Buffer buffer, value): buffer.write_string(value) @@ -1588,9 +1523,6 @@ cdef _base_date = datetime.date(1970, 1, 1) @cython.final cdef class DateSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.DATE32.value - cpdef inline write(self, Buffer buffer, value): if type(value) is not datetime.date: raise TypeError( @@ -1608,9 +1540,6 @@ cdef class DateSerializer(CrossLanguageCompatibleSerializer): @cython.final cdef class TimestampSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.TIMESTAMP.value - cpdef inline write(self, Buffer buffer, value): if type(value) is not datetime.datetime: raise TypeError( @@ -1628,9 +1557,6 @@ cdef class TimestampSerializer(CrossLanguageCompatibleSerializer): @cython.final cdef class BytesSerializer(CrossLanguageCompatibleSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.BINARY.value - cpdef inline write(self, Buffer buffer, value): self.fury.write_buffer_object(buffer, BytesBufferObject(value)) @@ -1662,9 +1588,6 @@ cdef class CollectionSerializer(Serializer): self.ref_resolver = fury.ref_resolver self.elem_serializer = elem_serializer - cpdef int16_t get_xtype_id(self): - return -FuryType.LIST.value - cdef pair[int8_t, int64_t] write_header(self, Buffer buffer, value): cdef int8_t collect_flag = COLLECTION_DEFAULT_FLAG elem_type = type(next(iter(value))) @@ -1819,9 +1742,6 @@ cdef class CollectionSerializer(Serializer): cdef class ListSerializer(CollectionSerializer): - cpdef int16_t get_xtype_id(self): - return FuryType.LIST.value - cpdef read(self, Buffer buffer): cdef MapRefResolver ref_resolver = self.fury.ref_resolver cdef ClassResolver class_resolver = self.fury.class_resolver @@ -1954,15 +1874,9 @@ cdef class StringArraySerializer(ListSerializer): def __init__(self, fury, type_): super().__init__(fury, type_, StringSerializer(fury, str)) - cpdef inline int16_t get_xtype_id(self): - return FuryType.FURY_STRING_ARRAY.value - @cython.final cdef class SetSerializer(CollectionSerializer): - cpdef inline int16_t get_xtype_id(self): - return FuryType.FURY_SET.value - cpdef inline read(self, Buffer buffer): cdef MapRefResolver ref_resolver = self.fury.ref_resolver cdef ClassResolver class_resolver = self.fury.class_resolver @@ -2041,9 +1955,6 @@ cdef class MapSerializer(Serializer): self.key_serializer = key_serializer self.value_serializer = value_serializer - cpdef inline int16_t get_xtype_id(self): - return FuryType.MAP.value - cpdef inline write(self, Buffer buffer, o): cdef dict value = o buffer.write_varint32(len(value)) @@ -2234,218 +2145,6 @@ cdef class SubMapSerializer(Serializer): return map_ -# Use numpy array or python array module. -typecode_dict = { - # use bytes serializer for byte array. - "h": (2, FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value), - "i": (4, FuryType.FURY_PRIMITIVE_INT_ARRAY.value), - "l": (8, FuryType.FURY_PRIMITIVE_LONG_ARRAY.value), - "f": (4, FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value), - "d": (8, FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value), -} -if np: - typecode_dict = { - k: (itemsize, -type_id) for k, (itemsize, type_id) in typecode_dict.items() - } - - -@cython.final -cdef class PyArraySerializer(CrossLanguageCompatibleSerializer): - typecode_dict = typecode_dict - typecodearray_type = { - "h": Int16ArrayType, - "i": Int32ArrayType, - "l": Int64ArrayType, - "f": Float32ArrayType, - "d": Float64ArrayType, - } - cdef str typecode - cdef int8_t itemsize - cdef int16_t type_id - - def __init__(self, fury, type_, str typecode): - super().__init__(fury, type_) - self.typecode = typecode - self.itemsize, self.type_id = PyArraySerializer.typecode_dict[self.typecode] - - cpdef int16_t get_xtype_id(self): - return self.type_id - - cpdef inline xwrite(self, Buffer buffer, value): - assert value.itemsize == self.itemsize - view = memoryview(value) - assert view.format == self.typecode - assert view.itemsize == self.itemsize - assert view.c_contiguous # TODO handle contiguous - cdef int32_t nbytes = len(value) * self.itemsize - buffer.write_varint32(nbytes) - buffer.write_buffer(value) - - cpdef inline xread(self, Buffer buffer): - data = buffer.read_bytes_and_size() - arr = array.array(self.typecode, []) - arr.frombytes(data) - return arr - - cpdef inline write(self, Buffer buffer, value: array.array): - cdef int32_t nbytes = len(value) * value.itemsize - buffer.write_string(value.typecode) - buffer.write_varint32(nbytes) - buffer.write_buffer(value) - - cpdef inline read(self, Buffer buffer): - typecode = buffer.read_string() - data = buffer.read_bytes_and_size() - arr = array.array(typecode, []) - arr.frombytes(data) - return arr - - -if np: - _np_dtypes_dict = { - # use bytes serializer for byte array. - np.dtype(np.bool_): (1, "?", FuryType.FURY_PRIMITIVE_BOOL_ARRAY.value), - np.dtype(np.int16): (2, "h", FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value), - np.dtype(np.int32): (4, "i", FuryType.FURY_PRIMITIVE_INT_ARRAY.value), - np.dtype(np.int64): (8, "l", FuryType.FURY_PRIMITIVE_LONG_ARRAY.value), - np.dtype(np.float32): (4, "f", FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value), - np.dtype(np.float64): (8, "d", FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value), - } -else: - _np_dtypes_dict = {} - - -@cython.final -cdef class Numpy1DArraySerializer(CrossLanguageCompatibleSerializer): - dtypes_dict = _np_dtypes_dict - cdef object dtype - cdef str typecode - cdef int8_t itemsize - cdef int16_t type_id - - def __init__(self, fury, type_, dtype): - super().__init__(fury, type_) - self.dtype = dtype - self.itemsize, self.typecode, self.type_id = _np_dtypes_dict[self.dtype] - - cpdef int16_t get_xtype_id(self): - return self.type_id - - cpdef inline xwrite(self, Buffer buffer, value): - assert value.itemsize == self.itemsize - view = memoryview(value) - try: - assert view.format == self.typecode - except AssertionError as e: - raise e - assert view.itemsize == self.itemsize - cdef int32_t nbytes = len(value) * self.itemsize - buffer.write_varint32(nbytes) - if self.dtype == np.dtype("bool") or not view.c_contiguous: - buffer.write_bytes(value.tobytes()) - else: - buffer.write_buffer(value) - - cpdef inline xread(self, Buffer buffer): - data = buffer.read_bytes_and_size() - return np.frombuffer(data, dtype=self.dtype) - - cpdef inline write(self, Buffer buffer, value): - self.fury.handle_unsupported_write(buffer, value) - - cpdef inline read(self, Buffer buffer): - return self.fury.handle_unsupported_read(buffer) - - -cdef _get_hash(Fury fury, list field_names, dict type_hints): - from pyfury._struct import StructHashVisitor - - visitor = StructHashVisitor(fury) - for index, key in enumerate(field_names): - infer_field(key, type_hints[key], visitor, types_path=[]) - hash_ = visitor.get_hash() - assert hash_ != 0 - return hash_ - - -cdef class ComplexObjectSerializer(Serializer): - cdef str _type_tag - cdef object _type_hints - cdef list _field_names - cdef list _serializers - cdef int32_t _hash - - def __init__(self, fury, clz: type, type_tag: str): - super().__init__(fury, clz) - self._type_tag = type_tag - self._type_hints = get_type_hints(clz) - self._field_names = sorted(self._type_hints.keys()) - self._serializers = [None] * len(self._field_names) - from pyfury._struct import ComplexTypeVisitor - - visitor = ComplexTypeVisitor(fury) - for index, key in enumerate(self._field_names): - serializer = infer_field(key, self._type_hints[key], visitor, types_path=[]) - self._serializers[index] = serializer - if self.fury.language == Language.PYTHON: - logger.warning( - "Type of class %s shouldn't be serialized using cross-language " - "serializer", - clz, - ) - self._hash = 0 - - cpdef int16_t get_xtype_id(self): - return FuryType.FURY_TYPE_TAG.value - - cpdef str get_xtype_tag(self): - return self._type_tag - - cpdef write(self, Buffer buffer, value): - return self.xwrite(buffer, value) - - cpdef read(self, Buffer buffer): - return self.xread(buffer) - - cpdef xwrite(self, Buffer buffer, value): - if self._hash == 0: - self._hash = _get_hash(self.fury, self._field_names, self._type_hints) - buffer.write_int32(self._hash) - cdef Serializer serializer - cdef int32_t index - for index, field_name in enumerate(self._field_names): - field_value = getattr(value, field_name) - serializer = self._serializers[index] - self.fury.xserialize_ref( - buffer, field_value, serializer=serializer - ) - - cpdef xread(self, Buffer buffer): - cdef int32_t hash_ = buffer.read_int32() - if self._hash == 0: - self._hash = _get_hash(self.fury, self._field_names, self._type_hints) - if hash_ != self._hash: - raise ClassNotCompatibleError( - f"Hash {hash_} is not consistent with {self._hash} " - f"for class {self.type_}", - ) - obj = self.type_.__new__(self.type_) - self.fury.ref_resolver.reference(obj) - cdef Serializer serializer - cdef int32_t index - for index, field_name in enumerate(self._field_names): - serializer = self._serializers[index] - field_value = self.fury.xdeserialize_ref( - buffer, serializer=serializer - ) - setattr( - obj, - field_name, - field_value, - ) - return obj - - @cython.final cdef class EnumSerializer(Serializer): @classmethod @@ -2522,18 +2221,3 @@ cdef class SliceSerializer(Serializer): cpdef xread(self, Buffer buffer): raise NotImplementedError - - -@cython.final -cdef class PickleSerializer(Serializer): - cpdef inline xwrite(self, Buffer buffer, value): - raise NotImplementedError - - cpdef inline xread(self, Buffer buffer): - raise NotImplementedError - - cpdef inline write(self, Buffer buffer, value): - self.fury.handle_unsupported_write(buffer, value) - - cpdef inline read(self, Buffer buffer): - return self.fury.handle_unsupported_read(buffer) diff --git a/python/pyfury/_serializer.py b/python/pyfury/_serializer.py index c6f39b1667..e62b68558c 100644 --- a/python/pyfury/_serializer.py +++ b/python/pyfury/_serializer.py @@ -118,29 +118,6 @@ def __init__(self, fury, type_: type): self.type_: type = type_ self.need_to_write_ref = not is_primitive_type(type_) - def get_xtype_id(self): - """ - Returns - ------- - Returns NOT_SUPPORT_CROSS_LANGUAGE if the serializer doesn't - support cross-language serialization. - Return a number in range (0, 32767) if the serializer support - cross-language serialization and native serialization data is the - same with cross-language serialization. - Return a negative short in range [-32768, 0) if the serializer - support cross-language serialization and native serialization data - is not the same with cross-language serialization. - """ - return NOT_SUPPORT_CROSS_LANGUAGE - - def get_xtype_tag(self): - """ - Returns - ------- - a type tag used for setup type mapping between languages. - """ - raise RuntimeError("Tag is only for struct.") - def write(self, buffer, value): raise NotImplementedError @@ -197,9 +174,6 @@ def read(self, buffer): class ByteSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.INT8.value - def write(self, buffer, value): buffer.write_int8(value) @@ -208,9 +182,6 @@ def read(self, buffer): class Int16Serializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.INT16.value - def write(self, buffer, value): buffer.write_int16(value) @@ -219,9 +190,6 @@ def read(self, buffer): class Int32Serializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.INT32.value - def write(self, buffer, value): buffer.write_int32(value) @@ -230,9 +198,6 @@ def read(self, buffer): class Int64Serializer(Serializer): - def get_xtype_id(self): - return FuryType.INT64.value - def xwrite(self, buffer, value): buffer.write_int64(value) @@ -247,9 +212,6 @@ def read(self, buffer): class FloatSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.FLOAT.value - def write(self, buffer, value): buffer.write_float(value) @@ -258,9 +220,6 @@ def read(self, buffer): class DoubleSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.DOUBLE.value - def write(self, buffer, value): buffer.write_double(value) @@ -269,9 +228,6 @@ def read(self, buffer): class StringSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.STRING.value - def write(self, buffer, value: str): buffer.write_string(value) @@ -283,9 +239,6 @@ def read(self, buffer): class DateSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.DATE32.value - def write(self, buffer, value: datetime.date): if not isinstance(value, datetime.date): raise TypeError( @@ -302,9 +255,6 @@ def read(self, buffer): class TimestampSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.TIMESTAMP.value - def write(self, buffer, value: datetime.datetime): if not isinstance(value, datetime.datetime): raise TypeError( @@ -321,9 +271,6 @@ def read(self, buffer): class BytesSerializer(CrossLanguageCompatibleSerializer): - def get_xtype_id(self): - return FuryType.BINARY.value - def write(self, buffer, value: bytes): assert isinstance(value, bytes) self.fury.write_buffer_object(buffer, BytesBufferObject(value)) @@ -333,117 +280,6 @@ def read(self, buffer): return fury_buf.to_pybytes() -# Use numpy array or python array module. -typecode_dict = { - # use bytes serializer for byte array. - "h": (2, FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value), - "i": (4, FuryType.FURY_PRIMITIVE_INT_ARRAY.value), - "l": (8, FuryType.FURY_PRIMITIVE_LONG_ARRAY.value), - "f": (4, FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value), - "d": (8, FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value), -} -if np: - typecode_dict = { - k: (itemsize, -type_id) for k, (itemsize, type_id) in typecode_dict.items() - } - - -class PyArraySerializer(CrossLanguageCompatibleSerializer): - typecode_dict = typecode_dict - typecodearray_type = { - "h": Int16ArrayType, - "i": Int32ArrayType, - "l": Int64ArrayType, - "f": Float32ArrayType, - "d": Float64ArrayType, - } - - def __init__(self, fury, type_, typecode): - super().__init__(fury, type_) - self.typecode = typecode - self.itemsize, self.type_id = PyArraySerializer.typecode_dict[self.typecode] - - def get_xtype_id(self): - return self.type_id - - def xwrite(self, buffer, value): - assert value.itemsize == self.itemsize - view = memoryview(value) - assert view.format == self.typecode - assert view.itemsize == self.itemsize - assert view.c_contiguous # TODO handle contiguous - nbytes = len(value) * self.itemsize - buffer.write_varint32(nbytes) - buffer.write_buffer(value) - - def xread(self, buffer): - data = buffer.read_bytes_and_size() - arr = array.array(self.typecode, []) - arr.frombytes(data) - return arr - - def write(self, buffer, value: array.array): - nbytes = len(value) * value.itemsize - buffer.write_string(value.typecode) - buffer.write_varint32(nbytes) - buffer.write_buffer(value) - - def read(self, buffer): - typecode = buffer.read_string() - data = buffer.read_bytes_and_size() - arr = array.array(typecode, []) - arr.frombytes(data) - return arr - - -if np: - _np_dtypes_dict = { - # use bytes serializer for byte array. - np.dtype(np.bool_): (1, "?", FuryType.FURY_PRIMITIVE_BOOL_ARRAY.value), - np.dtype(np.int16): (2, "h", FuryType.FURY_PRIMITIVE_SHORT_ARRAY.value), - np.dtype(np.int32): (4, "i", FuryType.FURY_PRIMITIVE_INT_ARRAY.value), - np.dtype(np.int64): (8, "l", FuryType.FURY_PRIMITIVE_LONG_ARRAY.value), - np.dtype(np.float32): (4, "f", FuryType.FURY_PRIMITIVE_FLOAT_ARRAY.value), - np.dtype(np.float64): (8, "d", FuryType.FURY_PRIMITIVE_DOUBLE_ARRAY.value), - } -else: - _np_dtypes_dict = {} - - -class Numpy1DArraySerializer(CrossLanguageCompatibleSerializer): - dtypes_dict = _np_dtypes_dict - - def __init__(self, fury, type_, dtype): - super().__init__(fury, type_) - self.dtype = dtype - self.itemsize, self.typecode, self.type_id = _np_dtypes_dict[self.dtype] - - def get_xtype_id(self): - return self.type_id - - def xwrite(self, buffer, value): - assert value.itemsize == self.itemsize - view = memoryview(value) - assert view.format == self.typecode - assert view.itemsize == self.itemsize - nbytes = len(value) * self.itemsize - buffer.write_varint32(nbytes) - if self.dtype == np.dtype("bool") or not view.c_contiguous: - buffer.write_bytes(value.tobytes()) - else: - buffer.write_buffer(value) - - def xread(self, buffer): - data = buffer.read_bytes_and_size() - return np.frombuffer(data, dtype=self.dtype) - - def write(self, buffer, value): - self.fury.handle_unsupported_write(buffer, value) - - def read(self, buffer): - return self.fury.handle_unsupported_read(buffer) - - class CollectionSerializer(Serializer): __slots__ = "class_resolver", "ref_resolver", "elem_serializer" diff --git a/python/pyfury/_struct.py b/python/pyfury/_struct.py index 0591b3076b..25287d7138 100644 --- a/python/pyfury/_struct.py +++ b/python/pyfury/_struct.py @@ -123,12 +123,6 @@ def __init__(self, fury, clz: type, type_tag: str): ) self._hash = 0 - def get_xtype_id(self): - return FuryType.FURY_TYPE_TAG.value - - def get_xtype_tag(self): - return self._type_tag - def write(self, buffer, value): return self.xwrite(buffer, value) @@ -176,16 +170,16 @@ def __init__( def visit_list(self, field_name, elem_type, types_path=None): # TODO add list element type to hash. - id_ = abs(ListSerializer(self.fury, list).get_xtype_id()) - self._hash = self._compute_field_hash(self._hash, id_) + xtype_id = self.fury.class_resolver.get_classinfo(list).class_id + self._hash = self._compute_field_hash(self._hash, abs(xtype_id)) def visit_dict(self, field_name, key_type, value_type, types_path=None): # TODO add map key/value type to hash. - id_ = abs(MapSerializer(self.fury, dict).get_xtype_id()) - self._hash = self._compute_field_hash(self._hash, id_) + xtype_id = self.fury.class_resolver.get_classinfo(dict).class_id + self._hash = self._compute_field_hash(self._hash, abs(xtype_id)) def visit_customized(self, field_name, type_, types_path=None): - serializer = self.fury.class_resolver.get_serializer(type_) + xtype_id = self.fury.class_resolver.get_classinfo(type_).class_id if serializer.get_xtype_id() != NOT_SUPPORT_CROSS_LANGUAGE: tag = serializer.get_xtype_tag() else: diff --git a/python/pyfury/_util.pxd b/python/pyfury/_util.pxd index 77f76aceb8..b7e679012c 100644 --- a/python/pyfury/_util.pxd +++ b/python/pyfury/_util.pxd @@ -138,6 +138,8 @@ cdef class Buffer: cpdef inline int32_t read_varint32(self) + cpdef inline int32_t read_varuint32(self) + cpdef put_buffer(self, uint32_t offset, v, int32_t src_index, int32_t length) cdef inline write_c_buffer(self, const uint8_t* value, int32_t length) diff --git a/python/pyfury/_util.pyx b/python/pyfury/_util.pyx index b758705613..e6a1d7a102 100644 --- a/python/pyfury/_util.pyx +++ b/python/pyfury/_util.pyx @@ -366,6 +366,10 @@ cdef class Buffer: return actual_bytes_written cpdef inline int32_t read_varint32(self): + # TODO(chaokunyang) add zig zag + return read_varuint32(self) + + cpdef inline int32_t read_varuint32(self): cdef: uint32_t read_length = 0 int8_t b diff --git a/python/pyfury/error.py b/python/pyfury/error.py index 91088d4815..f2e7c7a67b 100644 --- a/python/pyfury/error.py +++ b/python/pyfury/error.py @@ -24,5 +24,9 @@ class ClassNotCompatibleError(FuryError): pass +class TypeUnregisteredError(FuryError): + pass + + class CompileError(FuryError): pass diff --git a/python/pyfury/serializer.py b/python/pyfury/serializer.py index b348be83d0..7936c3e74f 100644 --- a/python/pyfury/serializer.py +++ b/python/pyfury/serializer.py @@ -396,3 +396,178 @@ def xwrite(self, buffer: Buffer, value): def xread(self, buffer): raise NotImplementedError + + +# Use numpy array or python array module. +typecode_dict = { + # use bytes serializer for byte array. + "h": (2, Int16ArrayType, FuryType.INT16_ARRAY.value), + "i": (4, Int32ArrayType, FuryType.INT32_ARRAY.value), + "l": (8, Int64ArrayType, FuryType.INT64_ARRAY.value), + "f": (4, Float32ArrayType, FuryType.FLOAT32_ARRAY.value), + "d": (8, Float64ArrayType, FuryType.FLOAT64_ARRAY.value), +} + +typeid_code = { + FuryType.INT16_ARRAY.value: "h", + FuryType.INT32_ARRAY.value: "i", + FuryType.INT64_ARRAY.value: "l", + FuryType.FLOAT32_ARRAY.value: "f", + FuryType.FLOAT64_ARRAY.value: "d", +} + +class PyArraySerializer(CrossLanguageCompatibleSerializer): + typecodearray_type = { + "h": Int16ArrayType, + "i": Int32ArrayType, + "l": Int64ArrayType, + "f": Float32ArrayType, + "d": Float64ArrayType, + } + + def __init__(self, fury, type_, typecode: str): + super().__init__(fury, type_) + self.typecode = typecode + self.itemsize, ftype, self.type_id = typecode_dict[self.typecode] + + def xwrite(self, buffer, value): + assert value.itemsize == self.itemsize + view = memoryview(value) + assert view.format == self.typecode + assert view.itemsize == self.itemsize + assert view.c_contiguous # TODO handle contiguous + nbytes = len(value) * self.itemsize + buffer.write_varint32(nbytes) + buffer.write_buffer(value) + + def xread(self, buffer): + data = buffer.read_bytes_and_size() + arr = array.array(self.typecode, []) + arr.frombytes(data) + return arr + + def write(self, buffer, value: array.array): + nbytes = len(value) * value.itemsize + buffer.write_string(value.typecode) + buffer.write_varint32(nbytes) + buffer.write_buffer(value) + + def read(self, buffer): + typecode = buffer.read_string() + data = buffer.read_bytes_and_size() + arr = array.array(typecode, []) + arr.frombytes(data) + return arr + + +class DynamicPyArraySerializer: + + def xwrite(self, buffer, value): + itemsize, ftype, type_id = typecode_dict[value.format] + view = memoryview(value) + nbytes = len(value) * itemsize + buffer.write_varint32(type_id) + buffer.write_varint32(nbytes) + if value.dtype == np.dtype("bool") or not view.c_contiguous: + buffer.write_bytes(value.tobytes()) + else: + buffer.write_buffer(value) + + def xread(self, buffer): + type_id = buffer.read_varint32() + typecode = typeid_code[type_id] + data = buffer.read_bytes_and_size() + arr = array.array(typecode, []) + arr.frombytes(data) + return arr + + def write(self, buffer, value): + self.fury.handle_unsupported_write(buffer, value) + + def read(self, buffer): + return self.fury.handle_unsupported_read(buffer) + + +if np: + _np_dtypes_dict = { + # use bytes serializer for byte array. + np.dtype(np.bool_): (1, "?", BoolArrayType, FuryType.BOOL_ARRAY.value), + np.dtype(np.int16): (2, "h", Int16ArrayType, FuryType.INT16_ARRAY.value), + np.dtype(np.int32): (4, "i", Int32ArrayType, FuryType.INT32_ARRAY.value), + np.dtype(np.int64): (8, "l", Int64ArrayType, FuryType.INT64_ARRAY.value), + np.dtype(np.float32): (4, "f", Float32ArrayType, FuryType.FLOAT32_ARRAY.value), + np.dtype(np.float64): (8, "d", Float64ArrayType, FuryType.FLOAT64_ARRAY.value), + } +else: + _np_dtypes_dict = {} + + +class Numpy1DArraySerializer: + dtypes_dict = _np_dtypes_dict + + def __init__(self, fury, type_, dtype): + super().__init__(fury, type_) + self.dtype = dtype + self.itemsize, self.typecode, self.type_id = _np_dtypes_dict[self.dtype] + + def xwrite(self, buffer, value): + assert value.itemsize == self.itemsize + view = memoryview(value) + try: + assert view.format == self.typecode + except AssertionError as e: + raise e + assert view.itemsize == self.itemsize + nbytes = len(value) * self.itemsize + buffer.write_varint32(nbytes) + if self.dtype == np.dtype("bool") or not view.c_contiguous: + buffer.write_bytes(value.tobytes()) + else: + buffer.write_buffer(value) + + def xread(self, buffer): + data = buffer.read_bytes_and_size() + return np.frombuffer(data, dtype=self.dtype) + + def write(self, buffer, value): + self.fury.handle_unsupported_write(buffer, value) + + def read(self, buffer): + return self.fury.handle_unsupported_read(buffer) + + +class NDArraySerializer: + + def xwrite(self, buffer, value): + itemsize, typecode, type_id = _np_dtypes_dict[value.dtype] + view = memoryview(value) + nbytes = len(value) * itemsize + buffer.write_varint32(type_id) + buffer.write_varint32(nbytes) + if value.dtype == np.dtype("bool") or not view.c_contiguous: + buffer.write_bytes(value.tobytes()) + else: + buffer.write_buffer(value) + + def xread(self, buffer): + raise NotImplementedError("Multi-demensional array not supported currently") + + def write(self, buffer, value): + self.fury.handle_unsupported_write(buffer, value) + + def read(self, buffer): + return self.fury.handle_unsupported_read(buffer) + + +class PickleSerializer(Serializer): + def xwrite(self, buffer, value): + raise NotImplementedError + + def xread(self, buffer): + raise NotImplementedError + + def write(self, buffer, value): + self.fury.handle_unsupported_write(buffer, value) + + def read(self, buffer): + return self.fury.handle_unsupported_read(buffer) diff --git a/python/pyfury/tests/test_cross_language.py b/python/pyfury/tests/test_cross_language.py index 9c0cc93e95..82f1c5d12b 100644 --- a/python/pyfury/tests/test_cross_language.py +++ b/python/pyfury/tests/test_cross_language.py @@ -444,7 +444,7 @@ class ComplexObject2: def test_serialize_simple_struct_local(): fury = pyfury.Fury(language=pyfury.Language.XLANG, ref_tracking=True) - fury.register_class(ComplexObject2, type_tag="test.ComplexObject2") + fury.register_type(ComplexObject2, type_tag="test.ComplexObject2") obj = ComplexObject2(f1=True, f2={-1: 2}) new_buf = fury.serialize(obj) assert fury.deserialize(new_buf) == obj @@ -453,7 +453,7 @@ def test_serialize_simple_struct_local(): @cross_language_test def test_serialize_simple_struct(data_file_path): fury = pyfury.Fury(language=pyfury.Language.XLANG, ref_tracking=True) - fury.register_class(ComplexObject2, type_tag="test.ComplexObject2") + fury.register_type(ComplexObject2, type_tag="test.ComplexObject2") obj = ComplexObject2(f1=True, f2={-1: 2}) struct_round_back(data_file_path, fury, obj) @@ -461,8 +461,8 @@ def test_serialize_simple_struct(data_file_path): @cross_language_test def test_serialize_complex_struct(data_file_path): fury = pyfury.Fury(language=pyfury.Language.XLANG, ref_tracking=True) - fury.register_class(ComplexObject1, type_tag="test.ComplexObject1") - fury.register_class(ComplexObject2, type_tag="test.ComplexObject2") + fury.register_type(ComplexObject1, type_tag="test.ComplexObject1") + fury.register_type(ComplexObject2, type_tag="test.ComplexObject2") obj2 = ComplexObject2(f1=True, f2={-1: 2}) obj1 = ComplexObject1( @@ -504,7 +504,7 @@ def test_serialize_opaque_object(data_file_path): data_bytes = f.read() debug_print(f"len {len(data_bytes)}") fury = pyfury.Fury(language=pyfury.Language.XLANG, ref_tracking=True) - fury.register_class(ComplexObject1, type_tag="test.ComplexObject1") + fury.register_type(ComplexObject1, type_tag="test.ComplexObject1") new_obj = fury.deserialize(data_bytes) debug_print(new_obj) assert new_obj.f2 == "abc" @@ -519,12 +519,6 @@ def test_serialize_opaque_object(data_file_path): class ComplexObject1Serializer(pyfury.serializer.Serializer): - def get_xtype_id(self): - return pyfury.type.FuryType.FURY_TYPE_TAG.value - - def get_xtype_tag(self): - return "test.ComplexObject1" - def write(self, buffer, value): self.xwrite(buffer, value) diff --git a/python/pyfury/tests/test_serializer.py b/python/pyfury/tests/test_serializer.py index 59da5b1567..4ee48dedf8 100644 --- a/python/pyfury/tests/test_serializer.py +++ b/python/pyfury/tests/test_serializer.py @@ -333,9 +333,6 @@ def xread(self, buffer): def get_xtype_id(self): return pyfury.FuryType.FURY_TYPE_TAG.value - def get_xtype_tag(self): - return "test.Bar" - class RegisterClass: def __init__(self, f1=None): @@ -372,7 +369,7 @@ class C: pass -def test_register_class(): +def test_register_type(): fury = Fury(language=Language.PYTHON, ref_tracking=True) class Serializer(pyfury.Serializer): diff --git a/python/pyfury/tests/test_struct.py b/python/pyfury/tests/test_struct.py index d1412026dd..8636cbdc04 100644 --- a/python/pyfury/tests/test_struct.py +++ b/python/pyfury/tests/test_struct.py @@ -51,8 +51,8 @@ class ComplexObject: def test_struct(): fury = Fury(language=Language.XLANG, ref_tracking=True) - fury.register_class(SimpleObject, type_tag="example.SimpleObject") - fury.register_class(ComplexObject, type_tag="example.ComplexObject") + fury.register_type(SimpleObject, type_tag="example.SimpleObject") + fury.register_type(ComplexObject, type_tag="example.ComplexObject") o = SimpleObject(f1={1: 1.0 / 3}) # assert ser_de(fury, o) == o