From 6faf3bf15c22a18984e132d9dd7e65a9cf670362 Mon Sep 17 00:00:00 2001 From: Srikanta <51379715+srnagar@users.noreply.github.com> Date: Wed, 16 Mar 2022 18:51:42 +0000 Subject: [PATCH] Cache schema strings when deserializing messages (#27688) * Enable decoding same message multiple times * fix merge conflicts * update changelog --- .../CHANGELOG.md | 2 + .../apacheavro/AvroSerializer.java | 25 +++++++++--- ...maRegistryApacheAvroSerializerBuilder.java | 8 ++-- .../apacheavro/AvroSerializerTest.java | 37 +++++++----------- ...chemaRegistryApacheAvroSerializerTest.java | 39 +++++++++++-------- 5 files changed, 62 insertions(+), 49 deletions(-) diff --git a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md index ecc7bfc872028..b4c9f4a9266a3 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md +++ b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/CHANGELOG.md @@ -7,6 +7,8 @@ ### Breaking Changes ### Bugs Fixed +- Fixed a bug that caused deserialize operation to throw `SchemaParseException` when multiple messages with same schema + were deserialized (https://github.com/Azure/azure-sdk-for-java/issues/27602). ### Other Changes diff --git a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/AvroSerializer.java b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/AvroSerializer.java index d5914efab42c8..77a0273249a58 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/AvroSerializer.java +++ b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/AvroSerializer.java @@ -34,6 +34,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder.MAX_CACHE_SIZE; /** * Class containing implementation of Apache Avro serializer @@ -46,9 +49,9 @@ class AvroSerializer { private final ClientLogger logger = new ClientLogger(AvroSerializer.class); private final boolean avroSpecificReader; - private final Schema.Parser parser; private final EncoderFactory encoderFactory; private final DecoderFactory decoderFactory; + private final Map parsedSchemas = new ConcurrentHashMap<>(); static { final HashMap, Schema> schemas = new HashMap<>(); @@ -93,15 +96,13 @@ class AvroSerializer { * * @param avroSpecificReader flag indicating if decoder should decode records as {@link SpecificRecord * SpecificRecords}. - * @param parser Schema parser to use. * @param encoderFactory Encoder factory * @param decoderFactory Decoder factory */ - AvroSerializer(boolean avroSpecificReader, Schema.Parser parser, EncoderFactory encoderFactory, + AvroSerializer(boolean avroSpecificReader, EncoderFactory encoderFactory, DecoderFactory decoderFactory) { this.avroSpecificReader = avroSpecificReader; - this.parser = Objects.requireNonNull(parser, "'parser' cannot be null."); this.encoderFactory = Objects.requireNonNull(encoderFactory, "'encoderFactory' cannot be null."); this.decoderFactory = Objects.requireNonNull(decoderFactory, "'decoderFactory' cannot be null."); } @@ -112,7 +113,21 @@ class AvroSerializer { * @return avro schema */ Schema parseSchemaString(String schemaString) { - return this.parser.parse(schemaString); + // Schema.Parser "remembers" all the named schemas previously parsed and throws + // SchemaParseException if an attempt to parse the same schema is made. + // So, we create a new instance of Schema.Parser each time since this method can + // be called multiple times for the same schema and there's no reliable way to know from the schema string + // that the named schema has not already been parsed. + + // We'll cache the entire schema string to minimize the need to parse the same string multiple times but we + // should switch to LRU cache as we don't want to store unlimited schemas and some schema strings can be very + // large and they should not be kept in memory if it's not actively used. + + // TODO(srnagar): change to LRU cache after this PR is merged - https://github.com/Azure/azure-sdk-for-java/pull/27408 + if (parsedSchemas.size() > MAX_CACHE_SIZE) { + parsedSchemas.clear(); + } + return parsedSchemas.computeIfAbsent(schemaString, schema -> new Schema.Parser().parse(schema)); } /** diff --git a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerBuilder.java b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerBuilder.java index 1b3114adf3d84..3e8ae2389e4b7 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerBuilder.java +++ b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/main/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerBuilder.java @@ -6,7 +6,6 @@ import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.data.schemaregistry.SchemaRegistryAsyncClient; -import org.apache.avro.Schema; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -21,7 +20,7 @@ */ public final class SchemaRegistryApacheAvroSerializerBuilder { private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false; - private static final int MAX_CACHE_SIZE = 128; + static final int MAX_CACHE_SIZE = 128; private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializerBuilder.class); private Boolean autoRegisterSchemas; @@ -121,10 +120,9 @@ public SchemaRegistryApacheAvroSerializer buildSerializer() { final boolean useAvroSpecificReader = avroSpecificReader == null ? AVRO_SPECIFIC_READER_DEFAULT : avroSpecificReader; - final Schema.Parser parser = new Schema.Parser(); - final AvroSerializer codec = new AvroSerializer(useAvroSpecificReader, parser, - EncoderFactory.get(), DecoderFactory.get()); final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE); + final AvroSerializer codec = new AvroSerializer(useAvroSpecificReader, EncoderFactory.get(), + DecoderFactory.get()); return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options); } diff --git a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/AvroSerializerTest.java b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/AvroSerializerTest.java index 4111e54aa9661..a16d86eaf105b 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/AvroSerializerTest.java +++ b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/AvroSerializerTest.java @@ -70,11 +70,9 @@ public void afterEach() throws Exception { @Test public void constructorNull() { assertThrows(NullPointerException.class, - () -> new AvroSerializer(true, null, encoderFactory, decoderFactory)); + () -> new AvroSerializer(true, null, decoderFactory)); assertThrows(NullPointerException.class, - () -> new AvroSerializer(true, parser, null, decoderFactory)); - assertThrows(NullPointerException.class, - () -> new AvroSerializer(true, parser, encoderFactory, null)); + () -> new AvroSerializer(true, encoderFactory, null)); } public static Stream getSchemaStringPrimitive() { @@ -129,8 +127,7 @@ public void getSchemaGenericContainer() { @Test public void encodesObject() throws IOException { // Arrange - final AvroSerializer registryUtils = new AvroSerializer(false, parser, - encoderFactory, decoderFactory); + final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory); final PlayingCard card = PlayingCard.newBuilder() .setPlayingCardSuit(PlayingCardSuit.DIAMONDS) @@ -157,14 +154,13 @@ public void encodesObject() throws IOException { @Test public void encodesAndDecodesObject() { // Arrange - final AvroSerializer registryUtils = new AvroSerializer(false, parser, - encoderFactory, decoderFactory); + final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory); final PlayingCard expected = PlayingCard.newBuilder() - .setPlayingCardSuit(PlayingCardSuit.DIAMONDS) - .setIsFaceCard(true) - .setCardValue(13) - .build(); + .setPlayingCardSuit(PlayingCardSuit.DIAMONDS) + .setIsFaceCard(true) + .setCardValue(13) + .build(); // Using the raw message encoder because the default card.getByteBuffer() uses BinaryMessageEncoder which adds // a header. @@ -173,7 +169,7 @@ public void encodesAndDecodesObject() { // Act final PlayingCard actual = registryUtils.decode(ByteBuffer.wrap(encoded), schemaBytes, - TypeReference.createInstance(PlayingCard.class)); + TypeReference.createInstance(PlayingCard.class)); // Assert assertCardEquals(expected, actual); @@ -187,8 +183,7 @@ public void encodesAndDecodesObject() { @Test public void decodeSingleObjectEncodedObject() throws IOException { // Arrange - final AvroSerializer registryUtils = new AvroSerializer(false, parser, - encoderFactory, decoderFactory); + final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory); final PlayingCard card = PlayingCard.newBuilder() .setPlayingCardSuit(PlayingCardSuit.DIAMONDS) @@ -221,11 +216,10 @@ public void decodeSingleObjectEncodedObject() throws IOException { expected.getCards().forEach(expectedCard -> { final int expectedSize = list.size() - 1; - assertTrue(list.removeIf(playingCard -> { - return expectedCard.getIsFaceCard() == playingCard.getIsFaceCard() - && expectedCard.getCardValue() == playingCard.getCardValue() - && expectedCard.getPlayingCardSuit() == playingCard.getPlayingCardSuit(); - })); + assertTrue(list.removeIf(playingCard -> + expectedCard.getIsFaceCard() == playingCard.getIsFaceCard() + && expectedCard.getCardValue() == playingCard.getCardValue() + && expectedCard.getPlayingCardSuit() == playingCard.getPlayingCardSuit())); assertEquals(expectedSize, list.size()); }); @@ -348,8 +342,7 @@ public static Stream getSchemaForTypeReference() { @ParameterizedTest public void getSchemaForTypeReference(TypeReference typeReference, Schema expected) { // Arrange - final AvroSerializer registryUtils = new AvroSerializer(false, parser, - encoderFactory, decoderFactory); + final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory); final Class clazz = typeReference.getJavaClass(); // Act diff --git a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerTest.java b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerTest.java index cf11a7db0b028..cbd09ac8a4c89 100644 --- a/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerTest.java +++ b/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/test/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerTest.java @@ -82,7 +82,6 @@ public class SchemaRegistryApacheAvroSerializerTest { private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); private InterceptorManager interceptorManager; - private Schema.Parser parser; private AutoCloseable mocksCloseable; private TestInfo testInfo; @@ -108,7 +107,6 @@ public void beforeEach(TestInfo testInfo) { this.testInfo = testInfo; this.mocksCloseable = MockitoAnnotations.openMocks(this); - this.parser = new Schema.Parser(); } @AfterEach @@ -125,7 +123,7 @@ public void afterEach() throws Exception { @Test public void testRegistryGuidPrefixedToPayload() { // manually add SchemaRegistryObject into mock registry client cache - final AvroSerializer avroSerializer = new AvroSerializer(false, new Schema.Parser(), + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS); final Schema playingClassSchema = PlayingCard.getClassSchema(); @@ -152,7 +150,7 @@ public void testRegistryGuidPrefixedToPayload() { @Test public void testNullPayloadThrowsSerializationException() { // Arrange - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, false, MOCK_CACHE_SIZE); final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, avroSerializer, @@ -168,7 +166,7 @@ public void testNullPayloadThrowsSerializationException() { @Test public void testIfRegistryNullThenThrow() { // Arrange - AvroSerializer encoder = new AvroSerializer(false, parser, ENCODER_FACTORY, + AvroSerializer encoder = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, false, MOCK_CACHE_SIZE); @@ -180,7 +178,7 @@ public void testIfRegistryNullThenThrow() { @Test void testGetSchemaAndDeserialize() throws IOException { // manually add SchemaRegistryObject to cache - final AvroSerializer decoder = new AvroSerializer(false, parser, ENCODER_FACTORY, + final AvroSerializer decoder = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS); final String playingClassSchema = PlayingCard.getClassSchema().toString(); @@ -213,6 +211,15 @@ void testGetSchemaAndDeserialize() throws IOException { assertEquals(playingCard.getIsFaceCard(), actual.getIsFaceCard()); }) .verifyComplete(); + + // Deserializing the same message again should work. + StepVerifier.create(encoder.deserializeMessageDataAsync(message, TypeReference.createInstance(PlayingCard.class))) + .assertNext(actual -> { + assertEquals(playingCard.getPlayingCardSuit(), actual.getPlayingCardSuit()); + assertEquals(playingCard.getCardValue(), actual.getCardValue()); + assertEquals(playingCard.getIsFaceCard(), actual.getIsFaceCard()); + }) + .verifyComplete(); } public static Stream testEmptyPayload() { @@ -230,7 +237,7 @@ public static Stream testEmptyPayload() { @ParameterizedTest public void testEmptyPayload(MessageWithMetadata message) { // Arrange - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY); + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, @@ -248,7 +255,7 @@ public void testEmptyPayload(MessageWithMetadata message) { @Test public void testEmptyPayloadSync() { // Arrange - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY); + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); final MockMessage message = new MockMessage(); @@ -267,7 +274,7 @@ public void testEmptyPayloadSync() { */ @Test public void testNullPayload() { - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY); + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, @@ -284,7 +291,7 @@ public void testNullPayload() { */ @Test public void testNullPayloadSync() { - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY); + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer( @@ -394,7 +401,7 @@ public void serializeForwardCompatibility() { @Test public void throwsWhenConstructorNotAvailable() { // Arrange - final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY); + final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer( client, avroSerializer, serializerOptions); @@ -421,7 +428,7 @@ public void backwardsCompatiblePreamble() throws IOException { .build(); final SchemaRegistrySchema schemaResponse = new SchemaRegistrySchema( new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO), expected.getSchema().toString()); - final AvroSerializer avroSerializer = new AvroSerializer(true, parser, ENCODER_FACTORY, + final AvroSerializer avroSerializer = new AvroSerializer(true, ENCODER_FACTORY, DECODER_FACTORY); final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE); final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, avroSerializer, @@ -497,11 +504,9 @@ private SchemaRegistryAsyncClient getSchemaRegistryClient(TestInfo testInfo, Tes tokenCredential = mock(TokenCredential.class); // Sometimes it throws an "NotAMockException", so we had to change from thenReturn to thenAnswer. - when(tokenCredential.getToken(any(TokenRequestContext.class))).thenAnswer(invocationOnMock -> { - return Mono.fromCallable(() -> { - return new AccessToken("foo", OffsetDateTime.now().plusMinutes(20)); - }); - }); + when(tokenCredential.getToken(any(TokenRequestContext.class))) + .thenAnswer(invocationOnMock -> Mono.fromCallable(() -> + new AccessToken("foo", OffsetDateTime.now().plusMinutes(20)))); endpoint = PLAYBACK_ENDPOINT; } else {