Skip to content

Commit

Permalink
Cache schema strings when deserializing messages (#27688)
Browse files Browse the repository at this point in the history
* Enable decoding same message multiple times

* fix merge conflicts

* update changelog
  • Loading branch information
srnagar authored Mar 16, 2022
1 parent 427d396 commit 6faf3bf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
### Breaking Changes

### Bugs Fixed
- Fixed a bug that caused deserialize operation to throw `SchemaParseException` when multiple messages with same schema
were deserialized (https://github.com/Azure/azure-sdk-for-java/issues/27602).

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder.MAX_CACHE_SIZE;

/**
* Class containing implementation of Apache Avro serializer
Expand All @@ -46,9 +49,9 @@ class AvroSerializer {

private final ClientLogger logger = new ClientLogger(AvroSerializer.class);
private final boolean avroSpecificReader;
private final Schema.Parser parser;
private final EncoderFactory encoderFactory;
private final DecoderFactory decoderFactory;
private final Map<String, Schema> parsedSchemas = new ConcurrentHashMap<>();

static {
final HashMap<Class<?>, Schema> schemas = new HashMap<>();
Expand Down Expand Up @@ -93,15 +96,13 @@ class AvroSerializer {
*
* @param avroSpecificReader flag indicating if decoder should decode records as {@link SpecificRecord
* SpecificRecords}.
* @param parser Schema parser to use.
* @param encoderFactory Encoder factory
* @param decoderFactory Decoder factory
*/
AvroSerializer(boolean avroSpecificReader, Schema.Parser parser, EncoderFactory encoderFactory,
AvroSerializer(boolean avroSpecificReader, EncoderFactory encoderFactory,
DecoderFactory decoderFactory) {

this.avroSpecificReader = avroSpecificReader;
this.parser = Objects.requireNonNull(parser, "'parser' cannot be null.");
this.encoderFactory = Objects.requireNonNull(encoderFactory, "'encoderFactory' cannot be null.");
this.decoderFactory = Objects.requireNonNull(decoderFactory, "'decoderFactory' cannot be null.");
}
Expand All @@ -112,7 +113,21 @@ class AvroSerializer {
* @return avro schema
*/
Schema parseSchemaString(String schemaString) {
return this.parser.parse(schemaString);
// Schema.Parser "remembers" all the named schemas previously parsed and throws
// SchemaParseException if an attempt to parse the same schema is made.
// So, we create a new instance of Schema.Parser each time since this method can
// be called multiple times for the same schema and there's no reliable way to know from the schema string
// that the named schema has not already been parsed.

// We'll cache the entire schema string to minimize the need to parse the same string multiple times but we
// should switch to LRU cache as we don't want to store unlimited schemas and some schema strings can be very
// large and they should not be kept in memory if it's not actively used.

// TODO(srnagar): change to LRU cache after this PR is merged - https://github.com/Azure/azure-sdk-for-java/pull/27408
if (parsedSchemas.size() > MAX_CACHE_SIZE) {
parsedSchemas.clear();
}
return parsedSchemas.computeIfAbsent(schemaString, schema -> new Schema.Parser().parse(schema));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
import org.apache.avro.Schema;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
Expand All @@ -21,7 +20,7 @@
*/
public final class SchemaRegistryApacheAvroSerializerBuilder {
private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false;
private static final int MAX_CACHE_SIZE = 128;
static final int MAX_CACHE_SIZE = 128;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializerBuilder.class);
private Boolean autoRegisterSchemas;
Expand Down Expand Up @@ -121,10 +120,9 @@ public SchemaRegistryApacheAvroSerializer buildSerializer() {

final boolean useAvroSpecificReader = avroSpecificReader == null
? AVRO_SPECIFIC_READER_DEFAULT : avroSpecificReader;
final Schema.Parser parser = new Schema.Parser();
final AvroSerializer codec = new AvroSerializer(useAvroSpecificReader, parser,
EncoderFactory.get(), DecoderFactory.get());
final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE);
final AvroSerializer codec = new AvroSerializer(useAvroSpecificReader, EncoderFactory.get(),
DecoderFactory.get());

return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@ public void afterEach() throws Exception {
@Test
public void constructorNull() {
assertThrows(NullPointerException.class,
() -> new AvroSerializer(true, null, encoderFactory, decoderFactory));
() -> new AvroSerializer(true, null, decoderFactory));
assertThrows(NullPointerException.class,
() -> new AvroSerializer(true, parser, null, decoderFactory));
assertThrows(NullPointerException.class,
() -> new AvroSerializer(true, parser, encoderFactory, null));
() -> new AvroSerializer(true, encoderFactory, null));
}

public static Stream<Arguments> getSchemaStringPrimitive() {
Expand Down Expand Up @@ -129,8 +127,7 @@ public void getSchemaGenericContainer() {
@Test
public void encodesObject() throws IOException {
// Arrange
final AvroSerializer registryUtils = new AvroSerializer(false, parser,
encoderFactory, decoderFactory);
final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory);

final PlayingCard card = PlayingCard.newBuilder()
.setPlayingCardSuit(PlayingCardSuit.DIAMONDS)
Expand All @@ -157,14 +154,13 @@ public void encodesObject() throws IOException {
@Test
public void encodesAndDecodesObject() {
// Arrange
final AvroSerializer registryUtils = new AvroSerializer(false, parser,
encoderFactory, decoderFactory);
final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory);

final PlayingCard expected = PlayingCard.newBuilder()
.setPlayingCardSuit(PlayingCardSuit.DIAMONDS)
.setIsFaceCard(true)
.setCardValue(13)
.build();
.setPlayingCardSuit(PlayingCardSuit.DIAMONDS)
.setIsFaceCard(true)
.setCardValue(13)
.build();

// Using the raw message encoder because the default card.getByteBuffer() uses BinaryMessageEncoder which adds
// a header.
Expand All @@ -173,7 +169,7 @@ public void encodesAndDecodesObject() {

// Act
final PlayingCard actual = registryUtils.decode(ByteBuffer.wrap(encoded), schemaBytes,
TypeReference.createInstance(PlayingCard.class));
TypeReference.createInstance(PlayingCard.class));

// Assert
assertCardEquals(expected, actual);
Expand All @@ -187,8 +183,7 @@ public void encodesAndDecodesObject() {
@Test
public void decodeSingleObjectEncodedObject() throws IOException {
// Arrange
final AvroSerializer registryUtils = new AvroSerializer(false, parser,
encoderFactory, decoderFactory);
final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory);

final PlayingCard card = PlayingCard.newBuilder()
.setPlayingCardSuit(PlayingCardSuit.DIAMONDS)
Expand Down Expand Up @@ -221,11 +216,10 @@ public void decodeSingleObjectEncodedObject() throws IOException {
expected.getCards().forEach(expectedCard -> {
final int expectedSize = list.size() - 1;

assertTrue(list.removeIf(playingCard -> {
return expectedCard.getIsFaceCard() == playingCard.getIsFaceCard()
&& expectedCard.getCardValue() == playingCard.getCardValue()
&& expectedCard.getPlayingCardSuit() == playingCard.getPlayingCardSuit();
}));
assertTrue(list.removeIf(playingCard ->
expectedCard.getIsFaceCard() == playingCard.getIsFaceCard()
&& expectedCard.getCardValue() == playingCard.getCardValue()
&& expectedCard.getPlayingCardSuit() == playingCard.getPlayingCardSuit()));

assertEquals(expectedSize, list.size());
});
Expand Down Expand Up @@ -348,8 +342,7 @@ public static Stream<Arguments> getSchemaForTypeReference() {
@ParameterizedTest
public <T> void getSchemaForTypeReference(TypeReference<T> typeReference, Schema expected) {
// Arrange
final AvroSerializer registryUtils = new AvroSerializer(false, parser,
encoderFactory, decoderFactory);
final AvroSerializer registryUtils = new AvroSerializer(false, encoderFactory, decoderFactory);
final Class<T> clazz = typeReference.getJavaClass();

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class SchemaRegistryApacheAvroSerializerTest {
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();

private InterceptorManager interceptorManager;
private Schema.Parser parser;
private AutoCloseable mocksCloseable;
private TestInfo testInfo;

Expand All @@ -108,7 +107,6 @@ public void beforeEach(TestInfo testInfo) {

this.testInfo = testInfo;
this.mocksCloseable = MockitoAnnotations.openMocks(this);
this.parser = new Schema.Parser();
}

@AfterEach
Expand All @@ -125,7 +123,7 @@ public void afterEach() throws Exception {
@Test
public void testRegistryGuidPrefixedToPayload() {
// manually add SchemaRegistryObject into mock registry client cache
final AvroSerializer avroSerializer = new AvroSerializer(false, new Schema.Parser(),
final AvroSerializer avroSerializer = new AvroSerializer(false,
ENCODER_FACTORY, DECODER_FACTORY);
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final Schema playingClassSchema = PlayingCard.getClassSchema();
Expand All @@ -152,7 +150,7 @@ public void testRegistryGuidPrefixedToPayload() {
@Test
public void testNullPayloadThrowsSerializationException() {
// Arrange
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY,
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY,
DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, false, MOCK_CACHE_SIZE);
final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, avroSerializer,
Expand All @@ -168,7 +166,7 @@ public void testNullPayloadThrowsSerializationException() {
@Test
public void testIfRegistryNullThenThrow() {
// Arrange
AvroSerializer encoder = new AvroSerializer(false, parser, ENCODER_FACTORY,
AvroSerializer encoder = new AvroSerializer(false, ENCODER_FACTORY,
DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, false, MOCK_CACHE_SIZE);

Expand All @@ -180,7 +178,7 @@ public void testIfRegistryNullThenThrow() {
@Test
void testGetSchemaAndDeserialize() throws IOException {
// manually add SchemaRegistryObject to cache
final AvroSerializer decoder = new AvroSerializer(false, parser, ENCODER_FACTORY,
final AvroSerializer decoder = new AvroSerializer(false, ENCODER_FACTORY,
DECODER_FACTORY);
final PlayingCard playingCard = new PlayingCard(true, 10, PlayingCardSuit.DIAMONDS);
final String playingClassSchema = PlayingCard.getClassSchema().toString();
Expand Down Expand Up @@ -213,6 +211,15 @@ void testGetSchemaAndDeserialize() throws IOException {
assertEquals(playingCard.getIsFaceCard(), actual.getIsFaceCard());
})
.verifyComplete();

// Deserializing the same message again should work.
StepVerifier.create(encoder.deserializeMessageDataAsync(message, TypeReference.createInstance(PlayingCard.class)))
.assertNext(actual -> {
assertEquals(playingCard.getPlayingCardSuit(), actual.getPlayingCardSuit());
assertEquals(playingCard.getCardValue(), actual.getCardValue());
assertEquals(playingCard.getIsFaceCard(), actual.getIsFaceCard());
})
.verifyComplete();
}

public static Stream<Arguments> testEmptyPayload() {
Expand All @@ -230,7 +237,7 @@ public static Stream<Arguments> testEmptyPayload() {
@ParameterizedTest
public void testEmptyPayload(MessageWithMetadata message) {
// Arrange
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY);
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);

final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client,
Expand All @@ -248,7 +255,7 @@ public void testEmptyPayload(MessageWithMetadata message) {
@Test
public void testEmptyPayloadSync() {
// Arrange
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY);
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);
final MockMessage message = new MockMessage();

Expand All @@ -267,7 +274,7 @@ public void testEmptyPayloadSync() {
*/
@Test
public void testNullPayload() {
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY);
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);

final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client,
Expand All @@ -284,7 +291,7 @@ public void testNullPayload() {
*/
@Test
public void testNullPayloadSync() {
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY);
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);

SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(
Expand Down Expand Up @@ -394,7 +401,7 @@ public void serializeForwardCompatibility() {
@Test
public void throwsWhenConstructorNotAvailable() {
// Arrange
final AvroSerializer avroSerializer = new AvroSerializer(false, parser, ENCODER_FACTORY, DECODER_FACTORY);
final AvroSerializer avroSerializer = new AvroSerializer(false, ENCODER_FACTORY, DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);
final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(
client, avroSerializer, serializerOptions);
Expand All @@ -421,7 +428,7 @@ public void backwardsCompatiblePreamble() throws IOException {
.build();
final SchemaRegistrySchema schemaResponse = new SchemaRegistrySchema(
new SchemaProperties(MOCK_GUID, SchemaFormat.AVRO), expected.getSchema().toString());
final AvroSerializer avroSerializer = new AvroSerializer(true, parser, ENCODER_FACTORY,
final AvroSerializer avroSerializer = new AvroSerializer(true, ENCODER_FACTORY,
DECODER_FACTORY);
final SerializerOptions serializerOptions = new SerializerOptions(MOCK_SCHEMA_GROUP, true, MOCK_CACHE_SIZE);
final SchemaRegistryApacheAvroSerializer encoder = new SchemaRegistryApacheAvroSerializer(client, avroSerializer,
Expand Down Expand Up @@ -497,11 +504,9 @@ private SchemaRegistryAsyncClient getSchemaRegistryClient(TestInfo testInfo, Tes
tokenCredential = mock(TokenCredential.class);

// Sometimes it throws an "NotAMockException", so we had to change from thenReturn to thenAnswer.
when(tokenCredential.getToken(any(TokenRequestContext.class))).thenAnswer(invocationOnMock -> {
return Mono.fromCallable(() -> {
return new AccessToken("foo", OffsetDateTime.now().plusMinutes(20));
});
});
when(tokenCredential.getToken(any(TokenRequestContext.class)))
.thenAnswer(invocationOnMock -> Mono.fromCallable(() ->
new AccessToken("foo", OffsetDateTime.now().plusMinutes(20))));

endpoint = PLAYBACK_ENDPOINT;
} else {
Expand Down

0 comments on commit 6faf3bf

Please sign in to comment.