diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index 3d2f5cca8427..32045f965122 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -181,7 +181,7 @@ public SchemaBuilder doc(String doc) { @Override public Map parameters() { - return Collections.unmodifiableMap(parameters); + return parameters == null ? null : Collections.unmodifiableMap(parameters); } /** diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java index 62020f372f5c..fdbaa0ae15e2 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java @@ -252,7 +252,14 @@ public void testArrayBuilderInvalidDefault() { @Test public void testMapBuilder() { - Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build(); + // SchemaBuilder should also pass the check + Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA); + assertTypeAndDefault(schema, Schema.Type.MAP, false, null); + assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); + assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); + assertNoMetadata(schema); + + schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build(); assertTypeAndDefault(schema, Schema.Type.MAP, false, null); assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 9a93a4e48a22..af8efee8e9d8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -179,7 +179,7 @@ public static String COMMIT_TASKS_KEY(String connectorName) { // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely // the same. public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct() - .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)) + .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).build()) .build(); public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0; public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()