diff --git a/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufDeserializer.java b/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufDeserializer.java index 244f520465a..ad07eb06e48 100644 --- a/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufDeserializer.java +++ b/protobuf-serializer/src/main/java/io/confluent/kafka/serializers/protobuf/AbstractKafkaProtobufDeserializer.java @@ -21,7 +21,6 @@ import com.google.protobuf.DynamicMessage; import com.google.protobuf.ExtensionRegistryLite; import com.google.protobuf.Message; -import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode; import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap; import java.io.InterruptedIOException; @@ -146,11 +145,14 @@ protected Object deserialize( schema = schemaForDeserialize(id, schema, subject, isKey); } - ParsedSchema readerSchema = null; + ProtobufSchema readerSchema = null; if (metadata != null) { - readerSchema = getLatestWithMetadata(subject).getSchema(); + readerSchema = (ProtobufSchema) getLatestWithMetadata(subject).getSchema(); } else if (useLatestVersion) { - readerSchema = lookupLatestVersion(subject, schema, false).getSchema(); + readerSchema = (ProtobufSchema) lookupLatestVersion(subject, schema, false).getSchema(); + } + if (readerSchema != null && readerSchema.toDescriptor(name) != null) { + readerSchema = schemaWithName(readerSchema, name); } if (includeSchemaAndVersion || readerSchema != null) { Integer version = schemaVersion(topic, isKey, id, subject, schema, null); @@ -175,7 +177,7 @@ protected Object deserialize( } if (readerSchema != null) { - schema = (ProtobufSchema) readerSchema; + schema = readerSchema; } if (schema.ruleSet() != null && schema.ruleSet().hasRules(RuleMode.READ)) { if (message == null) { diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java index a2600871d40..bd24a63b957 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java @@ -51,6 +51,7 @@ import io.confluent.kafka.schemaregistry.rules.NewSpecificWidget; import io.confluent.kafka.schemaregistry.rules.NewWidgetProto; import io.confluent.kafka.schemaregistry.rules.WidgetProto; +import io.confluent.kafka.schemaregistry.rules.WidgetProto.Pii; import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget; import io.confluent.kafka.schemaregistry.rules.cel.CelFieldExecutor; import io.confluent.kafka.serializers.KafkaAvroDeserializer; @@ -88,6 +89,7 @@ public class JsonataExecutorTest { private final KafkaAvroDeserializer specificAvroDeserializer; private final KafkaProtobufSerializer protobufSerializer; private final KafkaProtobufSerializer protobufSerializer2; + private final KafkaProtobufSerializer protobufSerializer3; private final KafkaProtobufDeserializer protobufDeserializer; private final KafkaProtobufDeserializer specificProtobufDeserializer; private final KafkaJsonSchemaSerializer jsonSchemaSerializer; @@ -140,6 +142,7 @@ public JsonataExecutorTest() { protobufSerializer = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig); protobufSerializer2 = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig); + protobufSerializer3 = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig); protobufDeserializer = new KafkaProtobufDeserializer<>(schemaRegistry, defaultConfig2); specificProps2.put(KafkaProtobufDeserializerConfig.DERIVE_TYPE_CONFIG, "true"); @@ -571,6 +574,45 @@ public void testKafkaProtobufSerializerSpecific() throws Exception { ); } + @Test + public void testKafkaProtobufSerializerGenericWithSecondMessage() throws Exception { + byte[] bytes; + Object obj; + + String ruleString = "$sift($, function($v, $k) {$k != 'size'})"; + + Pii pii = WidgetProto.Pii.newBuilder().setPii("secret").build(); + ProtobufSchema protobufSchema = new ProtobufSchema(pii.getDescriptorForType()); + SortedMap props = ImmutableSortedMap.of("application.version", "v1"); + Metadata metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet()); + protobufSchema = protobufSchema.copy(metadata, null); + schemaRegistry.register(topic + "-value", protobufSchema); + + protobufSchema = new ProtobufSchema(NewWidgetProto.Pii.getDescriptor()); + Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, + JsonataExecutor.TYPE, null, null, ruleString, null, null, false); + RuleSet ruleSet = new RuleSet(Collections.singletonList(rule), Collections.emptyList()); + props = ImmutableSortedMap.of("application.version", "v2"); + metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet()); + protobufSchema = protobufSchema.copy(metadata, ruleSet); + schemaRegistry.register(topic + "-value", protobufSchema); + + bytes = protobufSerializer3.serialize(topic, pii); + + obj = protobufDeserializer.deserialize(topic, bytes); + assertTrue( + "Returned object should be a Pii", + DynamicMessage.class.isInstance(obj) + ); + DynamicMessage dynamicMessage = (DynamicMessage) obj; + Descriptor dynamicDesc = dynamicMessage.getDescriptorForType(); + assertEquals( + "Returned object should be a Pii", + "secret", + ((DynamicMessage)obj).getField(dynamicDesc.findFieldByName("pii")) + ); + } + @Test public void testKafkaProtobufCondition() throws Exception { String ruleString = "$toMillis($.expiration) > $millis()";