Skip to content

Commit

Permalink
Merge branch '7.6.x' into 7.7.x by rayokota
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentSemaphore committed Nov 8, 2024
2 parents 5745130 + ccd90e0 commit 19fe1b7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.ExtensionRegistryLite;
import com.google.protobuf.Message;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -146,11 +145,14 @@ protected Object deserialize(
schema = schemaForDeserialize(id, schema, subject, isKey);
}

ParsedSchema readerSchema = null;
ProtobufSchema readerSchema = null;
if (metadata != null) {
readerSchema = getLatestWithMetadata(subject).getSchema();
readerSchema = (ProtobufSchema) getLatestWithMetadata(subject).getSchema();
} else if (useLatestVersion) {
readerSchema = lookupLatestVersion(subject, schema, false).getSchema();
readerSchema = (ProtobufSchema) lookupLatestVersion(subject, schema, false).getSchema();
}
if (readerSchema != null && readerSchema.toDescriptor(name) != null) {
readerSchema = schemaWithName(readerSchema, name);
}
if (includeSchemaAndVersion || readerSchema != null) {
Integer version = schemaVersion(topic, isKey, id, subject, schema, null);
Expand All @@ -175,7 +177,7 @@ protected Object deserialize(
}

if (readerSchema != null) {
schema = (ProtobufSchema) readerSchema;
schema = readerSchema;
}
if (schema.ruleSet() != null && schema.ruleSet().hasRules(RuleMode.READ)) {
if (message == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.kafka.schemaregistry.rules.NewSpecificWidget;
import io.confluent.kafka.schemaregistry.rules.NewWidgetProto;
import io.confluent.kafka.schemaregistry.rules.WidgetProto;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Pii;
import io.confluent.kafka.schemaregistry.rules.WidgetProto.Widget;
import io.confluent.kafka.schemaregistry.rules.cel.CelFieldExecutor;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class JsonataExecutorTest {
private final KafkaAvroDeserializer specificAvroDeserializer;
private final KafkaProtobufSerializer<WidgetProto.Widget> protobufSerializer;
private final KafkaProtobufSerializer<ExpiringSpecificWidgetProto.ExpiringSpecificWidget> protobufSerializer2;
private final KafkaProtobufSerializer<WidgetProto.Pii> protobufSerializer3;
private final KafkaProtobufDeserializer<DynamicMessage> protobufDeserializer;
private final KafkaProtobufDeserializer<NewWidgetProto.NewWidget> specificProtobufDeserializer;
private final KafkaJsonSchemaSerializer<OldWidget> jsonSchemaSerializer;
Expand Down Expand Up @@ -140,6 +142,7 @@ public JsonataExecutorTest() {

protobufSerializer = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig);
protobufSerializer2 = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig);
protobufSerializer3 = new KafkaProtobufSerializer<>(schemaRegistry, defaultConfig);
protobufDeserializer = new KafkaProtobufDeserializer<>(schemaRegistry, defaultConfig2);

specificProps2.put(KafkaProtobufDeserializerConfig.DERIVE_TYPE_CONFIG, "true");
Expand Down Expand Up @@ -571,6 +574,45 @@ public void testKafkaProtobufSerializerSpecific() throws Exception {
);
}

@Test
public void testKafkaProtobufSerializerGenericWithSecondMessage() throws Exception {
byte[] bytes;
Object obj;

String ruleString = "$sift($, function($v, $k) {$k != 'size'})";

Pii pii = WidgetProto.Pii.newBuilder().setPii("secret").build();
ProtobufSchema protobufSchema = new ProtobufSchema(pii.getDescriptorForType());
SortedMap<String, String> props = ImmutableSortedMap.of("application.version", "v1");
Metadata metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet());
protobufSchema = protobufSchema.copy(metadata, null);
schemaRegistry.register(topic + "-value", protobufSchema);

protobufSchema = new ProtobufSchema(NewWidgetProto.Pii.getDescriptor());
Rule rule = new Rule("myRule", null, RuleKind.TRANSFORM, RuleMode.UPGRADE,
JsonataExecutor.TYPE, null, null, ruleString, null, null, false);
RuleSet ruleSet = new RuleSet(Collections.singletonList(rule), Collections.emptyList());
props = ImmutableSortedMap.of("application.version", "v2");
metadata = new Metadata(Collections.emptySortedMap(), props, Collections.emptySortedSet());
protobufSchema = protobufSchema.copy(metadata, ruleSet);
schemaRegistry.register(topic + "-value", protobufSchema);

bytes = protobufSerializer3.serialize(topic, pii);

obj = protobufDeserializer.deserialize(topic, bytes);
assertTrue(
"Returned object should be a Pii",
DynamicMessage.class.isInstance(obj)
);
DynamicMessage dynamicMessage = (DynamicMessage) obj;
Descriptor dynamicDesc = dynamicMessage.getDescriptorForType();
assertEquals(
"Returned object should be a Pii",
"secret",
((DynamicMessage)obj).getField(dynamicDesc.findFieldByName("pii"))
);
}

@Test
public void testKafkaProtobufCondition() throws Exception {
String ruleString = "$toMillis($.expiration) > $millis()";
Expand Down

0 comments on commit 19fe1b7

Please sign in to comment.