From 7802c264c96ae27167cf38c263b86398aa0ea3fe Mon Sep 17 00:00:00 2001 From: Yash Mayya Date: Thu, 17 Aug 2023 19:13:53 +0100 Subject: [PATCH] MINOR: Allow writing tombstone offsets for arbitrary partitions in the FileStreamSourceConnector (#14234) Reviewers: Chris Egerton --- .../file/FileStreamSourceConnector.java | 21 ++++++++++--------- .../file/FileStreamSourceConnectorTest.java | 14 +++++++++++++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 13193f8f50124..37cdcec1b053e 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -117,18 +117,10 @@ public boolean alterOffsets(Map connectorConfig, Map, Map> partitionOffset : offsets.entrySet()) { - Map partition = partitionOffset.getKey(); - if (partition == null) { - throw new ConnectException("Partition objects cannot be null"); - } - - if (!partition.containsKey(FILENAME_FIELD)) { - throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); - } - Map offset = partitionOffset.getValue(); - // null offsets are allowed and represent a deletion of offsets for a partition if (offset == null) { + // We allow tombstones for anything; if there's garbage in the offsets for the connector, we don't + // want to prevent users from being able to clean it up using the REST API continue; } @@ -145,6 +137,15 @@ public boolean alterOffsets(Map connectorConfig, Map partition = partitionOffset.getKey(); + if (partition == null) { + throw new ConnectException("Partition objects cannot be null"); + } + + if (!partition.containsKey(FILENAME_FIELD)) { + throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'"); + } } // Let the task check whether the actual value for the offset position is valid for the configured file on startup diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index 185faa80eb34a..41915913b03e3 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -227,4 +227,18 @@ public void testSuccessfulAlterOffsets() { assertTrue(connector.alterOffsets(sourceProperties, offsets)); assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>())); } + + @Test + public void testAlterOffsetsTombstones() { + Function, Boolean> alterOffsets = partition -> connector.alterOffsets( + sourceProperties, + Collections.singletonMap(partition, null) + ); + + assertTrue(alterOffsets.apply(null)); + assertTrue(alterOffsets.apply(Collections.emptyMap())); + assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, FILENAME))); + assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename"))); + assertTrue(alterOffsets.apply(Collections.singletonMap("garbage_partition_key", "garbage_partition_value"))); + } }