From 672ea644f002e9d5a858a8c19b95de1199bcca5c Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Sat, 9 Sep 2023 19:02:42 +0530 Subject: [PATCH] MINOR: Removed the RSM and RLMM classpath config validator (#14358) - RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator - Fix getting the `localTieredStorage` by brokerId after stopping a broker. Reviewers: Christo Lolov , Luke Chen , Satish Duggana --- .../log/remote/storage/RemoteLogManagerConfig.java | 4 ++-- .../server/log/remote/storage/LocalTieredStorage.java | 4 ++++ .../tiered/storage/TieredStorageTestContext.java | 11 +++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index f636363971325..32dcfe3731183 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -171,7 +171,7 @@ public final class RemoteLogManagerConfig { REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC) .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING, null, - new ConfigDef.NonEmptyString(), + null, MEDIUM, REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC) .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, @@ -183,7 +183,7 @@ public final class RemoteLogManagerConfig { .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, STRING, null, - new ConfigDef.NonEmptyString(), + null, MEDIUM, REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC) .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java index 43c09ccd908eb..64131d155590b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java @@ -561,4 +561,8 @@ private RemoteLogSegmentFileset.RemoteLogSegmentFileType getLogSegmentFileType(I } return SEGMENT; } + + public int brokerId() { + return brokerId; + } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 1975a1690cfc3..59acae74ad3f9 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -271,11 +271,18 @@ public TopicSpec topicSpec(String topicName) { public LocalTieredStorageSnapshot takeTieredStorageSnapshot() { int aliveBrokerId = harness.aliveBrokers().head().config().brokerId(); - return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManagers.get(aliveBrokerId)); + return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId)); } public LocalTieredStorageHistory tieredStorageHistory(int brokerId) { - return remoteStorageManagers.get(brokerId).getHistory(); + return remoteStorageManager(brokerId).getHistory(); + } + + public LocalTieredStorage remoteStorageManager(int brokerId) { + return remoteStorageManagers.stream() + .filter(rsm -> rsm.brokerId() == brokerId) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId)); } public List remoteStorageManagers() {