Skip to content

Commit

Permalink
KCA: flag to force optional primitive schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Mar 29, 2023
1 parent 07acdbc commit c688a22
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
private PulsarKafkaConnectSinkConfig kafkaSinkConfig;

protected String topicName;
protected boolean useOptionalPrimitives;

private boolean sanitizeTopicName = false;
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
Expand Down Expand Up @@ -164,6 +165,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives();

useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
Expand Down Expand Up @@ -446,8 +448,11 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
// Assume Key_Value schema's key and value are always optional
keySchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives);
valueSchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives);

Object nativeObject = sourceRecord.getValue().getNativeObject();

Expand All @@ -464,12 +469,13 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
keySchema = Schema.BYTES_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
} else {
key = sourceRecord.getKey().orElse(null);
keySchema = Schema.STRING_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
valueSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(sourceRecord.getSchema(), useOptionalPrimitives);
value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
private boolean collapsePartitionedTopics = false;

@FieldDoc(
defaultValue = "false",
help = "Pulsar schema does not contain information whether the Schema is optional, Kafka's does. \n"
+ "This provides a way to force all primitive schemas to be optional for Kafka. \n")
private boolean useOptionalPrimitives = false;

public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ public Schema schema() {
}

private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToOptionalKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
private static final Cache<byte[], Schema> schemaCache =
CacheBuilder.newBuilder().maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
private static final Cache<Schema, Schema> optionalSchemaCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

static {
pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
Expand All @@ -134,6 +138,17 @@ public Schema schema() {
.put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
.put(SchemaType.DATE, Date.SCHEMA)
.build();
pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
.put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA)
.put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA)
.put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA)
.put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA)
.put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA)
.put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA)
.put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA)
.put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA)
.build();
kafkaLogicalSchemas = ImmutableSet.<String>builder()
.add(Timestamp.LOGICAL_NAME)
.add(Date.LOGICAL_NAME)
Expand All @@ -153,12 +168,33 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
return parser.parse(schemaJson);
}

public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
Schema s = getKafkaConnectSchema(pulsarSchema);
return new OptionalForcingSchema(s);
public static Schema makeOptional(Schema s) {
if (s == null || s.isOptional()) {
return s;
}

String logicalSchemaName = s.name();
if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
return s;
}

try {
return optionalSchemaCache.get(s, () -> new OptionalForcingSchema(s));
} catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
String msg = "Failed to create optional schema for " + s;
log.error(msg);
throw new IllegalStateException(msg, ee);
}
}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
return makeOptional(getKafkaConnectSchema(pulsarSchema, useOptionalPrimitives));

}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
}
Expand Down Expand Up @@ -191,6 +227,11 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName);
}

if (useOptionalPrimitives
&& pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}

if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}
Expand All @@ -199,8 +240,10 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
return SchemaBuilder.map(
makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives)),
makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives)))
.optional()
.build();
}
org.apache.avro.Schema avroSchema = parseAvroSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public T answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
final private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";

private Path file;
private Map<String, Object> props;
Expand Down Expand Up @@ -797,7 +797,9 @@ public void kafkaLogicalTypesTimestampTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/30/1999 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -815,7 +817,9 @@ public void kafkaLogicalTypesTimeTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("01/01/1970 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -833,7 +837,9 @@ public void kafkaLogicalTypesDateTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/31/2022 00:00:00");
Object connectData = KafkaConnectData
Expand All @@ -854,7 +860,9 @@ public void kafkaLogicalTypesDecimalTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData
.getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema);
Expand All @@ -874,11 +882,11 @@ public void connectDataComplexAvroSchemaGenericRecordTest() {
getGenericRecord(value, pulsarAvroSchema));

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), false);

Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
Assert.assertTrue(kafkaSchema.isOptional());
Assert.assertTrue(kafkaSchema.keySchema().isOptional());
Assert.assertTrue(kafkaSchema.valueSchema().isOptional());
}

@Test
Expand Down Expand Up @@ -990,7 +998,8 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema);
.getKafkaConnectSchema(pulsarAvroSchema, false);
Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

Expand All @@ -999,6 +1008,18 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema, true);
Assert.assertFalse(kafkaSchema.isOptional());

connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
}

private JsonNode pojoAsJsonNode(Object pojo) {
Expand Down
Loading

0 comments on commit c688a22

Please sign in to comment.