Skip to content

Commit

Permalink
Add rule client functionality (#2402)
Browse files Browse the repository at this point in the history
* Add rule client with executors

* Incorporate review feedback

* Fix test

* Fix test

* Fix for schema-registry-plugins
  • Loading branch information
rayokota authored Nov 1, 2022
1 parent 996a0e5 commit 85c88b4
Show file tree
Hide file tree
Showing 69 changed files with 9,471 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -81,11 +82,17 @@ public void configure(Map<String, ?> configs, boolean isKey) {

@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return fromConnectData(topic, null, schema, value);
}

@Override
public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
try {
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
return serializer.serialize(
topic,
isKey,
headers,
avroData.fromConnectData(schema, avroSchema, value),
new AvroSchema(avroSchema));
} catch (SerializationException e) {
Expand All @@ -102,9 +109,14 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {

@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return toConnectData(topic, null, value);
}

@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
try {
GenericContainerWithVersion containerWithVersion =
deserializer.deserialize(topic, isKey, value);
deserializer.deserialize(topic, isKey, headers, value);
if (containerWithVersion == null) {
return SchemaAndValue.NULL;
}
Expand Down Expand Up @@ -146,12 +158,14 @@ public Serializer(Map<String, ?> configs, SchemaRegistryClient client) {
}

public byte[] serialize(
String topic, boolean isKey, Object value, AvroSchema schema) {
String topic, boolean isKey, Headers headers, Object value, AvroSchema schema) {
if (value == null) {
return null;
}
return serializeImpl(
getSubjectName(topic, isKey, value, schema),
topic,
headers,
value,
schema);
}
Expand All @@ -168,8 +182,9 @@ public Deserializer(Map<String, ?> configs, SchemaRegistryClient client) {
configure(new KafkaAvroDeserializerConfig(configs));
}

public GenericContainerWithVersion deserialize(String topic, boolean isKey, byte[] payload) {
return deserializeWithSchemaAndVersion(topic, isKey, payload);
public GenericContainerWithVersion deserialize(
String topic, boolean isKey, Headers headers, byte[] payload) {
return deserializeWithSchemaAndVersion(topic, isKey, headers, payload);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public byte[] serialize(
Object object,
ParsedSchema schema
) {
return super.serializeImpl(subject, object, (AvroSchema) schema);
return super.serializeImpl(subject, topic, null, object, (AvroSchema) schema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package io.confluent.kafka.serializers;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -46,10 +50,10 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaSchemaSerDe {
private final DecoderFactory decoderFactory = DecoderFactory.get();
protected boolean isKey;
protected boolean useSpecificAvroReader = false;
protected Schema specificAvroReaderSchema = null;
protected boolean avroReflectionAllowNull = false;
Expand Down Expand Up @@ -160,7 +164,12 @@ protected Object deserialize(byte[] payload, Schema readerSchema) throws Seriali
}

protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema)
throws SerializationException {
throws SerializationException {
return deserialize(topic, isKey, null, payload, readerSchema);
}

protected Object deserialize(String topic, Boolean isKey, Headers headers,
byte[] payload, Schema readerSchema) throws SerializationException {
if (schemaRegistry == null) {
throw new InvalidConfigurationException(
"SchemaRegistryClient not found. You need to configure the deserializer "
Expand All @@ -170,23 +179,37 @@ protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema
return null;
}

DeserializationContext context = new DeserializationContext(topic, isKey, payload);
return context.read(context.schemaFromRegistry().rawSchema(), readerSchema);
DeserializationContext context = new DeserializationContext(topic, isKey, headers, payload);
return context.read(context.schemaFromRegistry(),
readerSchema != null ? new AvroSchema(readerSchema) : null);
}

private Integer schemaVersion(String topic,
boolean isKey,
int id,
String subject,
AvroSchema schema,
Object result) throws IOException, RestClientException {
Integer version;
if (isDeprecatedSubjectNameStrategy(isKey)) {
subject = getSubjectName(topic, isKey, result, schema);
Object result) {
try {
Integer version;
if (isDeprecatedSubjectNameStrategy(isKey)) {
subject = getSubjectName(topic, isKey, result, schema);
}
AvroSchema subjectSchema = (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, id);
version = schemaRegistry.getVersion(subject, subjectSchema);
return version;
} catch (IOException e) {
throw new SerializationException("Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema version for id "
+ id, e);
} catch (RestClientException e) {
String errorMessage = "Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema version for id "
+ id;
throw toKafkaException(e, errorMessage);
}
AvroSchema subjectSchema = (AvroSchema) schemaRegistry.getSchemaBySubjectAndId(subject, id);
version = schemaRegistry.getVersion(subject, subjectSchema);
return version;
}

private String subjectName(String topic, boolean isKey, AvroSchema schemaFromRegistry) {
Expand All @@ -206,6 +229,12 @@ private String subjectName(String topic, boolean isKey, AvroSchema schemaFromReg
protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
String topic, boolean isKey, byte[] payload)
throws SerializationException, InvalidConfigurationException {
return deserializeWithSchemaAndVersion(topic, isKey, null, payload);
}

protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
String topic, boolean isKey, Headers headers, byte[] payload)
throws SerializationException, InvalidConfigurationException {
// Even if the caller requests schema & version, if the payload is null we cannot include it.
// The caller must handle this case.
if (payload == null) {
Expand All @@ -221,30 +250,18 @@ protected GenericContainerWithVersion deserializeWithSchemaAndVersion(
// Converter to let a version provided by a Kafka Connect source take priority over the
// schema registry's ordering (which is implicit by auto-registration time rather than
// explicit from the Connector).
DeserializationContext context = new DeserializationContext(topic, isKey, payload);
DeserializationContext context = new DeserializationContext(topic, isKey, headers, payload);
AvroSchema schema = context.schemaForDeserialize();
Object result = context.read(schema.rawSchema(), specificAvroReaderSchema);
Object result = context.read(schema,
specificAvroReaderSchema != null ? new AvroSchema(specificAvroReaderSchema) : null);

try {
Integer version = schemaVersion(topic, isKey, context.getSchemaId(),
context.getSubject(), schema, result);
if (schema.rawSchema().getType().equals(Schema.Type.RECORD)) {
return new GenericContainerWithVersion((GenericContainer) result, version);
} else {
return new GenericContainerWithVersion(new NonRecordContainer(schema.rawSchema(), result),
version);
}
} catch (IOException e) {
throw new SerializationException("Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema version for id "
+ context.getSchemaId(), e);
} catch (RestClientException e) {
String errorMessage = "Error retrieving Avro "
+ getSchemaType(isKey)
+ " schema version for id "
+ context.getSchemaId();
throw toKafkaException(e, errorMessage);
Integer version = schemaVersion(topic, isKey, context.getSchemaId(),
context.getSubject(), schema, result);
if (schema.rawSchema().getType().equals(Schema.Type.RECORD)) {
return new GenericContainerWithVersion((GenericContainer) result, version);
} else {
return new GenericContainerWithVersion(new NonRecordContainer(schema.rawSchema(), result),
version);
}
}

Expand Down Expand Up @@ -347,12 +364,15 @@ private Schema getReflectionReaderSchema(Schema writerSchema) {
class DeserializationContext {
private final String topic;
private final Boolean isKey;
private final Headers headers;
private final ByteBuffer buffer;
private final int schemaId;

DeserializationContext(final String topic, final Boolean isKey, final byte[] payload) {
DeserializationContext(
final String topic, final Boolean isKey, Headers headers, final byte[] payload) {
this.topic = topic;
this.isKey = isKey;
this.headers = headers;
this.buffer = getByteBuffer(payload);
this.schemaId = buffer.getInt();
}
Expand Down Expand Up @@ -417,32 +437,75 @@ int getSchemaId() {
return schemaId;
}

Object read(Schema writerSchema) {
return read(writerSchema, specificAvroReaderSchema);
Object read(AvroSchema writerAvroSchema) {
return read(writerAvroSchema,
specificAvroReaderSchema != null ? new AvroSchema(specificAvroReaderSchema) : null);
}

Object read(Schema writerSchema, Schema readerSchema) {
Object read(AvroSchema writerAvroSchema, AvroSchema readerAvroSchema) {
try {
DatumReader<?> reader = getDatumReader(writerSchema, readerSchema);
List<Migration> migrations = Collections.emptyList();
if (readerAvroSchema == null) {
readerAvroSchema = (AvroSchema) getLatestWithMetadata(getSubject());
if (readerAvroSchema != null) {
// set version on the writer schema
writerAvroSchema = schemaForDeserialize();
Integer version = schemaVersion(
topic, isKey, schemaId, getSubject(), writerAvroSchema, null);
writerAvroSchema = writerAvroSchema.copy(version);

migrations = getMigrations(getSubject(), writerAvroSchema, readerAvroSchema);
}
}

Schema writerSchema = writerAvroSchema.rawSchema();
Schema readerSchema = readerAvroSchema != null ? readerAvroSchema.rawSchema() : null;
DatumReader<?> reader;
if (!migrations.isEmpty()) {
// if migration is required, then initially use GenericDatumReader
reader = new GenericDatumReader<>(writerSchema);
} else {
reader = getDatumReader(writerSchema, readerSchema);
}
int length = buffer.limit() - 1 - idSize;
Object result;
if (writerSchema.getType().equals(Schema.Type.BYTES)) {
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
return bytes;
result = bytes;
} else {
int start = buffer.position() + buffer.arrayOffset();
Object result = reader.read(null, decoderFactory.binaryDecoder(buffer.array(),
result = reader.read(null, decoderFactory.binaryDecoder(buffer.array(),
start, length, null));
if (writerSchema.getType().equals(Schema.Type.STRING)) {
return result.toString();
} else {
return result;
result = result.toString();
}
}

// First apply migration rules
if (!migrations.isEmpty()) {
result = executeMigrations(migrations, getSubject(), topic, headers, result);
}

if (readerAvroSchema == null) {
readerAvroSchema = writerAvroSchema;
}
if (result instanceof JsonNode) {
reader = getDatumReader(readerAvroSchema.rawSchema(), readerAvroSchema.rawSchema());
result = AvroSchemaUtils.toObject(
(JsonNode) result, readerAvroSchema, (DatumReader<Object>) reader);
}

// Next apply domain rules
result = executeRules(
getSubject(), topic, headers, RuleMode.READ, null, readerAvroSchema, result
);

return result;
} catch (ExecutionException ex) {
throw new SerializationException("Error deserializing Avro message for id "
+ schemaId, ex.getCause());
} catch (IOException | RuntimeException e) {
} catch (RestClientException | IOException | RuntimeException e) {
// avro deserialization may throw AvroRuntimeException, NullPointerException, etc
throw new SerializationException("Error deserializing Avro message for id "
+ schemaId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -35,6 +36,7 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;

public abstract class AbstractKafkaAvroSerializer extends AbstractKafkaSchemaSerDe {

Expand Down Expand Up @@ -83,8 +85,13 @@ protected KafkaAvroSerializerConfig serializerConfig(Properties props) {
return new KafkaAvroSerializerConfig(props);
}

protected byte[] serializeImpl(String subject, Object object, AvroSchema schema)
throws SerializationException, InvalidConfigurationException {
return serializeImpl(subject, null, null, object, schema);
}

protected byte[] serializeImpl(
String subject, Object object, AvroSchema schema)
String subject, String topic, Headers headers, Object object, AvroSchema schema)
throws SerializationException, InvalidConfigurationException {
if (schemaRegistry == null) {
StringBuilder userFriendlyMsgBuilder = new StringBuilder();
Expand All @@ -111,6 +118,10 @@ protected byte[] serializeImpl(
schema = (AvroSchema)
lookupSchemaBySubjectAndId(subject, useSchemaId, schema, idCompatStrict);
id = schemaRegistry.getId(subject, schema);
} else if (metadata != null) {
restClientErrorMsg = "Error retrieving latest with metadata '" + metadata + "'";
schema = (AvroSchema) getLatestWithMetadata(subject);
id = schemaRegistry.getId(subject, schema);
} else if (useLatestVersion) {
restClientErrorMsg = "Error retrieving latest version of Avro schema";
schema = (AvroSchema) lookupLatestVersion(subject, schema, latestCompatStrict);
Expand All @@ -119,6 +130,7 @@ protected byte[] serializeImpl(
restClientErrorMsg = "Error retrieving Avro schema";
id = schemaRegistry.getId(subject, schema, normalizeSchema);
}
object = executeRules(subject, topic, headers, RuleMode.WRITE, null, schema, object);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.confluent.kafka.serializers;

import org.apache.avro.Schema;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;
Expand Down Expand Up @@ -96,7 +97,12 @@ public KafkaAvroDeserializer(SchemaRegistryClient client, Map<String, ?> props,

@Override
public Object deserialize(String topic, byte[] bytes) {
return deserialize(topic, isKey, bytes, specificAvroReaderSchema);
return deserialize(topic, null, bytes);
}

@Override
public Object deserialize(String topic, Headers headers, byte[] bytes) {
return deserialize(topic, isKey, headers, bytes, specificAvroReaderSchema);
}

/**
Expand Down
Loading

0 comments on commit 85c88b4

Please sign in to comment.