Skip to content

Commit

Permalink
MINOR: Allow writing tombstone offsets for arbitrary partitions in th…
Browse files Browse the repository at this point in the history
…e FileStreamSourceConnector (apache#14234)

Reviewers: Chris Egerton <[email protected]>
  • Loading branch information
yashmayya authored Aug 17, 2023
1 parent a253dc6 commit 7802c26
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,10 @@ public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String,
// This connector makes use of a single source partition at a time which represents the file that it is configured to read from.
// However, there could also be source partitions from previous configurations of the connector.
for (Map.Entry<Map<String, ?>, Map<String, ?>> partitionOffset : offsets.entrySet()) {
Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}

if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}

Map<String, ?> offset = partitionOffset.getValue();
// null offsets are allowed and represent a deletion of offsets for a partition
if (offset == null) {
// We allow tombstones for anything; if there's garbage in the offsets for the connector, we don't
// want to prevent users from being able to clean it up using the REST API
continue;
}

Expand All @@ -145,6 +137,15 @@ public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String,
if (offsetPosition < 0) {
throw new ConnectException("The value for the '" + POSITION_FIELD + "' key in the offset should be a non-negative value");
}

Map<String, ?> partition = partitionOffset.getKey();
if (partition == null) {
throw new ConnectException("Partition objects cannot be null");
}

if (!partition.containsKey(FILENAME_FIELD)) {
throw new ConnectException("Partition objects should contain the key '" + FILENAME_FIELD + "'");
}
}

// Let the task check whether the actual value for the offset position is valid for the configured file on startup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,18 @@ public void testSuccessfulAlterOffsets() {
assertTrue(connector.alterOffsets(sourceProperties, offsets));
assertTrue(connector.alterOffsets(sourceProperties, new HashMap<>()));
}

@Test
public void testAlterOffsetsTombstones() {
Function<Map<String, ?>, Boolean> alterOffsets = partition -> connector.alterOffsets(
sourceProperties,
Collections.singletonMap(partition, null)
);

assertTrue(alterOffsets.apply(null));
assertTrue(alterOffsets.apply(Collections.emptyMap()));
assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, FILENAME)));
assertTrue(alterOffsets.apply(Collections.singletonMap(FILENAME_FIELD, "/someotherfilename")));
assertTrue(alterOffsets.apply(Collections.singletonMap("garbage_partition_key", "garbage_partition_value")));
}
}

0 comments on commit 7802c26

Please sign in to comment.