diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 475724cc4e545..10efc91ccdaad 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -90,6 +90,7 @@ public class KafkaConnectSink implements Sink { private PulsarKafkaConnectSinkConfig kafkaSinkConfig; protected String topicName; + protected boolean useOptionalPrimitives; private boolean sanitizeTopicName = false; // Thi is a workaround for https://github.com/apache/pulsar/issues/19922 @@ -164,6 +165,7 @@ public void open(Map config, SinkContext ctx) throws Exception { unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName(); collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics(); + useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives(); useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset(); maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset(); @@ -446,8 +448,11 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { && sourceRecord.getSchema().getSchemaInfo() != null && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema(); - keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema()); - valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema()); + // Assume Key_Value schema's key and value are always optional + keySchema = PulsarSchemaToKafkaSchema + .getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives); + valueSchema = PulsarSchemaToKafkaSchema + .getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives); Object nativeObject = sourceRecord.getValue().getNativeObject(); @@ -464,12 +469,13 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { } else { if (sourceRecord.getMessage().get().hasBase64EncodedKey()) { key = sourceRecord.getMessage().get().getKeyBytes(); - keySchema = Schema.BYTES_SCHEMA; + keySchema = useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA; } else { key = sourceRecord.getKey().orElse(null); - keySchema = Schema.STRING_SCHEMA; + keySchema = useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } - valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema()); + valueSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(sourceRecord.getSchema(), useOptionalPrimitives); value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema); } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java index 2525081a41ebb..96519e63e0afa 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java @@ -99,6 +99,12 @@ public class PulsarKafkaConnectSinkConfig implements Serializable { help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.") private boolean collapsePartitionedTopics = false; + @FieldDoc( + defaultValue = "false", + help = "Pulsar schema does not contain information whether the Schema is optional, Kafka's does. \n" + + "This provides a way to force all primitive schemas to be optional for Kafka. \n") + private boolean useOptionalPrimitives = false; + public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class); diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java index faf28585e8aed..21a0a0f42e025 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java @@ -115,11 +115,15 @@ public Schema schema() { } private static final ImmutableMap pulsarSchemaTypeToKafkaSchema; + private static final ImmutableMap pulsarSchemaTypeToOptionalKafkaSchema; private static final ImmutableSet kafkaLogicalSchemas; private static final AvroData avroData = new AvroData(1000); private static final Cache schemaCache = CacheBuilder.newBuilder().maximumSize(10000) .expireAfterAccess(30, TimeUnit.MINUTES).build(); + private static final Cache optionalSchemaCache = + CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(); static { pulsarSchemaTypeToKafkaSchema = ImmutableMap.builder() @@ -134,6 +138,17 @@ public Schema schema() { .put(SchemaType.BYTES, Schema.BYTES_SCHEMA) .put(SchemaType.DATE, Date.SCHEMA) .build(); + pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.builder() + .put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA) + .put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA) + .put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA) + .put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA) + .put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA) + .put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA) + .put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA) + .put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA) + .build(); kafkaLogicalSchemas = ImmutableSet.builder() .add(Timestamp.LOGICAL_NAME) .add(Date.LOGICAL_NAME) @@ -153,12 +168,33 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) { return parser.parse(schemaJson); } - public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { - Schema s = getKafkaConnectSchema(pulsarSchema); - return new OptionalForcingSchema(s); + public static Schema makeOptional(Schema s) { + if (s == null || s.isOptional()) { + return s; + } + + String logicalSchemaName = s.name(); + if (kafkaLogicalSchemas.contains(logicalSchemaName)) { + return s; + } + + try { + return optionalSchemaCache.get(s, () -> new OptionalForcingSchema(s)); + } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) { + String msg = "Failed to create optional schema for " + s; + log.error(msg); + throw new IllegalStateException(msg, ee); + } } - public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) { + public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema, + boolean useOptionalPrimitives) { + return makeOptional(getKafkaConnectSchema(pulsarSchema, useOptionalPrimitives)); + + } + + public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema, + boolean useOptionalPrimitives) { if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) { throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null); } @@ -191,6 +227,11 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName); } + if (useOptionalPrimitives + && pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) { + return pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType()); + } + if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) { return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType()); } @@ -199,8 +240,10 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> { if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; - return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()), - getOptionalKafkaConnectSchema(kvSchema.getValueSchema())) + return SchemaBuilder.map( + makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives)), + makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives))) + .optional() .build(); } org.apache.avro.Schema avroSchema = parseAvroSchema( diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 6ccfa3b71a2f7..5410e0bb8d664 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -162,7 +162,7 @@ public T answer(InvocationOnMock invocationOnMock) throws Throwable { } } - private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset"; + final private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset"; private Path file; private Map props; @@ -797,7 +797,9 @@ public void kafkaLogicalTypesTimestampTest() { .build()); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(schema); + .getKafkaConnectSchema(schema, true); + + Assert.assertFalse(kafkaSchema.isOptional()); java.util.Date date = getDateFromString("12/30/1999 11:12:13"); Object connectData = KafkaConnectData @@ -815,7 +817,9 @@ public void kafkaLogicalTypesTimeTest() { .build()); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(schema); + .getKafkaConnectSchema(schema, true); + + Assert.assertFalse(kafkaSchema.isOptional()); java.util.Date date = getDateFromString("01/01/1970 11:12:13"); Object connectData = KafkaConnectData @@ -833,7 +837,9 @@ public void kafkaLogicalTypesDateTest() { .build()); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(schema); + .getKafkaConnectSchema(schema, true); + + Assert.assertFalse(kafkaSchema.isOptional()); java.util.Date date = getDateFromString("12/31/2022 00:00:00"); Object connectData = KafkaConnectData @@ -854,7 +860,9 @@ public void kafkaLogicalTypesDecimalTest() { .build()); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(schema); + .getKafkaConnectSchema(schema, true); + + Assert.assertFalse(kafkaSchema.isOptional()); Object connectData = KafkaConnectData .getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema); @@ -874,11 +882,11 @@ public void connectDataComplexAvroSchemaGenericRecordTest() { getGenericRecord(value, pulsarAvroSchema)); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema)); + .getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), false); - Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema); - - org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + Assert.assertTrue(kafkaSchema.isOptional()); + Assert.assertTrue(kafkaSchema.keySchema().isOptional()); + Assert.assertTrue(kafkaSchema.valueSchema().isOptional()); } @Test @@ -990,7 +998,8 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema); org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema - .getKafkaConnectSchema(pulsarAvroSchema); + .getKafkaConnectSchema(pulsarAvroSchema, false); + Assert.assertFalse(kafkaSchema.isOptional()); Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema); @@ -999,6 +1008,18 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem Object jsonNode = pojoAsJsonNode(pojo); connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema); org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + + kafkaSchema = PulsarSchemaToKafkaSchema + .getKafkaConnectSchema(pulsarAvroSchema, true); + Assert.assertFalse(kafkaSchema.isOptional()); + + connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema); + + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); + + jsonNode = pojoAsJsonNode(pojo); + connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema); + org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData); } private JsonNode pojoAsJsonNode(Object pojo) { diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java index 9cc6db034c870..b236365bbb8a1 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.math.BigInteger; @@ -39,6 +40,8 @@ import java.util.Map; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; /** * Test the conversion of PulsarSchema To KafkaSchema\. @@ -132,101 +135,134 @@ static class ComplexStruct { String[] stringArr; } - @Test - public void bytesSchemaTest() { + @DataProvider(name = "useOptionalPrimitives") + public static Object[][] useOptionalPrimitives() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "useOptionalPrimitives") + public void bytesSchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void stringSchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void stringSchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRING); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void booleanSchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void booleanSchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BOOLEAN); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void int8SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void int8SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT8); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void int16SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void int16SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT16); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void int32SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void int32SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT32); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void int64SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void int64SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT64); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void float32SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void float32SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT32); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void float64SchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void float64SchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE, useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT64); + assertEquals(useOptionalPrimitives, kafkaSchema.isOptional()); } - @Test - public void kvBytesSchemaTest() { + @Test(dataProvider = "useOptionalPrimitives") + public void kvBytesSchemaTest(boolean useOptionalPrimitives) { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES()); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES(), useOptionalPrimitives); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP); assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES); assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES); + assertTrue(kafkaSchema.isOptional()); + + // key and value are always optional + assertTrue(kafkaSchema.keySchema().isOptional()); + assertTrue(kafkaSchema.valueSchema().isOptional()); } @Test public void kvBytesIntSchemaTests() { Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING, Schema.INT64); org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema, false); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP); assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.STRING); assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.INT64); + assertTrue(kafkaSchema.isOptional()); + + // key and value are always optional + assertTrue(kafkaSchema.keySchema().isOptional()); + assertTrue(kafkaSchema.valueSchema().isOptional()); } @Test public void avroSchemaTest() { AvroSchema pulsarAvroSchema = AvroSchema.of(StructWithAnnotations.class); org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false); + org.apache.kafka.connect.data.Schema kafkaSchemaOpt = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size()); for (String name: STRUCT_FIELDS) { assertEquals(kafkaSchema.field(name).name(), name); + // set by avro schema + assertEquals(kafkaSchema.field(name).schema().isOptional(), + kafkaSchemaOpt.field(name).schema().isOptional()); } } @@ -234,11 +270,16 @@ public void avroSchemaTest() { public void avroComplexSchemaTest() { AvroSchema pulsarAvroSchema = AvroSchema.of(ComplexStruct.class); org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, false); + org.apache.kafka.connect.data.Schema kafkaSchemaOpt = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema, true); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size()); for (String name: COMPLEX_STRUCT_FIELDS) { assertEquals(kafkaSchema.field(name).name(), name); + // set by avro schema + assertEquals(kafkaSchema.field(name).schema().isOptional(), + kafkaSchemaOpt.field(name).schema().isOptional()); } } @@ -250,11 +291,16 @@ public void jsonSchemaTest() { .withAlwaysAllowNull(false) .build()); org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, false); + org.apache.kafka.connect.data.Schema kafkaSchemaOpt = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, true); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size()); for (String name: STRUCT_FIELDS) { assertEquals(kafkaSchema.field(name).name(), name); + // set by schema + assertEquals(kafkaSchema.field(name).schema().isOptional(), + kafkaSchemaOpt.field(name).schema().isOptional()); } } @@ -266,11 +312,27 @@ public void jsonComplexSchemaTest() { .withAlwaysAllowNull(false) .build()); org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, false); assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size()); for (String name: COMPLEX_STRUCT_FIELDS) { assertEquals(kafkaSchema.field(name).name(), name); + assertFalse(kafkaSchema.field(name).schema().isOptional()); + } + + kafkaSchema = + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema, true); + assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT); + assertEquals(kafkaSchema.fields().size(), COMPLEX_STRUCT_FIELDS.size()); + for (String name: COMPLEX_STRUCT_FIELDS) { + assertEquals(kafkaSchema.field(name).name(), name); + assertFalse(kafkaSchema.field(name).schema().isOptional()); + + if (kafkaSchema.field(name).schema().type().isPrimitive()) { + // false because .withAlwaysAllowNull(false), avroschema values are used + assertFalse(kafkaSchema.field(name).schema().isOptional(), + kafkaSchema.field(name).schema().type().getName()); + } } } @@ -308,39 +370,40 @@ public void castToKafkaSchemaTest() { @Test public void dateSchemaTest() { org.apache.kafka.connect.data.Schema kafkaSchema = - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE, true); assertEquals(kafkaSchema.type(), Date.SCHEMA.type()); + assertFalse(kafkaSchema.isOptional()); } // not supported schemas below: @Test(expectedExceptions = IllegalStateException.class) public void timeSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME, false); } @Test(expectedExceptions = IllegalStateException.class) public void timestampSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP, false); } @Test(expectedExceptions = IllegalStateException.class) public void instantSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT, false); } @Test(expectedExceptions = IllegalStateException.class) public void localDateSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE, false); } @Test(expectedExceptions = IllegalStateException.class) public void localTimeSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME, false); } @Test(expectedExceptions = IllegalStateException.class) public void localDatetimeSchemaTest() { - PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME); + PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME, false); } }