Skip to content

Commit

Permalink
[SchemaRegistry] remove all serializer caches (#21020)
Browse files Browse the repository at this point in the history
  • Loading branch information
swathipil authored Oct 1, 2021
1 parent 3e72868 commit 9b2a640
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from typing import BinaryIO, Union, TypeVar, Dict
try:
from functools import lru_cache
except ImportError:
from backports.functools_lru_cache import lru_cache
from typing import BinaryIO, Union, TypeVar
from io import BytesIO
import avro
from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder
Expand All @@ -38,9 +42,18 @@ def __init__(self, codec=None):
:param str codec: The writer codec. If None, let the avro library decides.
"""
self._writer_codec = codec
self._schema_writer_cache = {} # type: Dict[str, DatumWriter]
self._schema_reader_cache = {} # type: Dict[str, DatumReader]

@lru_cache(maxsize=128)
def _get_schema_writer(self, schema): # pylint: disable=no-self-use
schema = avro.schema.parse(schema)
return DatumWriter(schema)

@lru_cache(maxsize=128)
def _get_schema_reader(self, schema): # pylint: disable=no-self-use
schema = avro.schema.parse(schema)
return DatumReader(writers_schema=schema)

# pylint: disable=no-self-use
def serialize(
self,
data, # type: ObjectType
Expand All @@ -60,21 +73,15 @@ def serialize(
if not schema:
raise ValueError("Schema is required in Avro serializer.")

if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)

try:
writer = self._schema_writer_cache[str(schema)]
except KeyError:
writer = DatumWriter(schema)
self._schema_writer_cache[str(schema)] = writer
writer = self._get_schema_writer(str(schema))

stream = BytesIO()
with stream:
writer.write(data, BinaryEncoder(stream))
encoded_data = stream.getvalue()
return encoded_data

# pylint: disable=no-self-use
def deserialize(
self,
data, # type: Union[bytes, BinaryIO]
Expand All @@ -93,14 +100,7 @@ def deserialize(
if not hasattr(data, 'read'):
data = BytesIO(data)

if not isinstance(schema, avro.schema.Schema):
schema = avro.schema.parse(schema)

try:
reader = self._schema_reader_cache[str(schema)]
except KeyError:
reader = DatumReader(writers_schema=schema)
self._schema_reader_cache[str(schema)] = reader
reader = self._get_schema_reader(str(schema))

with data:
bin_decoder = BinaryDecoder(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def __init__(self, **kwargs):
if self._auto_register_schemas
else self._schema_registry_client.get_schema_id
)
self._user_input_schema_cache = {}

def __enter__(self):
# type: () -> SchemaRegistryAvroSerializer
Expand Down Expand Up @@ -115,6 +114,11 @@ def _get_schema(self, schema_id, **kwargs):
).schema_content
return schema_str

@classmethod
@lru_cache(maxsize=128)
def _parse_schema(cls, schema):
return avro.schema.parse(schema)

def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
"""
Expand All @@ -132,13 +136,8 @@ def serialize(self, value, **kwargs):
raw_input_schema = kwargs.pop("schema")
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
try:
cached_schema = self._user_input_schema_cache[raw_input_schema]
except KeyError:
parsed_schema = avro.schema.parse(raw_input_schema)
self._user_input_schema_cache[raw_input_schema] = parsed_schema
cached_schema = parsed_schema

cached_schema = AvroSerializer._parse_schema(raw_input_schema)
record_format_identifier = b"\0\0\0\0"
schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs)
data_bytes = self._avro_serializer.serialize(value, cached_schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}'
headers:
content-type:
- application/json
date:
- Thu, 30 Sep 2021 02:05:53 GMT
- Fri, 01 Oct 2021 22:19:06 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- f666e373299048fabaa4296f5dbfed46
- 7b4eff1c25d9438a975ff7a3d985a5c6
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"f666e373299048fabaa4296f5dbfed46"}'
string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}'
headers:
content-type:
- application/json
date:
- Thu, 30 Sep 2021 02:05:54 GMT
- Fri, 01 Oct 2021 22:19:07 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- f666e373299048fabaa4296f5dbfed46
- 7b4eff1c25d9438a975ff7a3d985a5c6
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/f666e373299048fabaa4296f5dbfed46?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")
Expand All @@ -111,8 +109,6 @@ def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregi
dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache

assert encoded_data[0:4] == b'\0\0\0\0'
schema_id = sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id
assert encoded_data[4:36] == schema_id.encode("utf-8")
Expand Down

0 comments on commit 9b2a640

Please sign in to comment.