Skip to content

Commit

Permalink
MINOR: Removed the RSM and RLMM classpath config validator (apache#14358
Browse files Browse the repository at this point in the history
)

- RSM and RLMM classpath can be empty since it's optional so removed the non-empty string validator
- Fix getting the `localTieredStorage` by brokerId after stopping a broker.

Reviewers: Christo Lolov <[email protected]>, Luke Chen <[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
kamalcph authored Sep 9, 2023
1 parent b24ccd6 commit 672ea64
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public final class RemoteLogManagerConfig {
REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
.define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
null,
new ConfigDef.NonEmptyString(),
null,
MEDIUM,
REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
Expand All @@ -183,7 +183,7 @@ public final class RemoteLogManagerConfig {
.define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
STRING,
null,
new ConfigDef.NonEmptyString(),
null,
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
.define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,8 @@ private RemoteLogSegmentFileset.RemoteLogSegmentFileType getLogSegmentFileType(I
}
return SEGMENT;
}

public int brokerId() {
return brokerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,18 @@ public TopicSpec topicSpec(String topicName) {

public LocalTieredStorageSnapshot takeTieredStorageSnapshot() {
int aliveBrokerId = harness.aliveBrokers().head().config().brokerId();
return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManagers.get(aliveBrokerId));
return LocalTieredStorageSnapshot.takeSnapshot(remoteStorageManager(aliveBrokerId));
}

public LocalTieredStorageHistory tieredStorageHistory(int brokerId) {
return remoteStorageManagers.get(brokerId).getHistory();
return remoteStorageManager(brokerId).getHistory();
}

public LocalTieredStorage remoteStorageManager(int brokerId) {
return remoteStorageManagers.stream()
.filter(rsm -> rsm.brokerId() == brokerId)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No remote storage manager found for broker " + brokerId));
}

public List<LocalTieredStorage> remoteStorageManagers() {
Expand Down

0 comments on commit 672ea64

Please sign in to comment.