Skip to content

Commit

Permalink
[ISSUE apache#6633] [RIP-65] Improving Tiered Storage Implementation (a…
Browse files Browse the repository at this point in the history
…pache#6781)

[ISSUE apache#6633] [RIP-65] Improving Tiered Storage Implementation
  • Loading branch information
lizhimins authored Jun 9, 2023
1 parent 6c9359f commit 6eac107
Show file tree
Hide file tree
Showing 83 changed files with 3,477 additions and 4,614 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -732,51 +732,79 @@ private void updateNamesrvAddr() {
}
}

public boolean initialize() throws CloneNotSupportedException {

public boolean initializeMetadata() {
boolean result = this.topicConfigManager.load();
result = result && this.topicQueueMappingManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
result = result && this.consumerOrderInfoManager.load();
return result;
}

if (result) {
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
public boolean initializeMessageStore() {
boolean result = true;
try {
DefaultMessageStore defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());

if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, defaultMessageStore);
((DLedgerCommitLog) defaultMessageStore.getCommitLog()).getdLedgerServer().getDLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats(defaultMessageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (this.brokerConfig.isEnableControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (IOException e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler =
new DLedgerRoleChangeHandler(this, defaultMessageStore);
((DLedgerCommitLog) defaultMessageStore.getCommitLog())
.getdLedgerServer().getDLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}

this.brokerStats = new BrokerStats(defaultMessageStore);

// Load store plugin
MessageStorePluginContext context = new MessageStorePluginContext(
messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig, configuration);
this.messageStore = MessageStoreFactory.build(context, defaultMessageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
if (this.brokerConfig.isEnableControllerMode()) {
this.replicasManager = new ReplicasManager(this);
}
if (messageStoreConfig.isTimerWheelEnable()) {
this.timerCheckpoint = new TimerCheckpoint(BrokerPathConfigHelper.getTimerCheckPath(messageStoreConfig.getStorePathRootDir()));
TimerMetrics timerMetrics = new TimerMetrics(BrokerPathConfigHelper.getTimerMetricsPath(messageStoreConfig.getStorePathRootDir()));
this.timerMessageStore = new TimerMessageStore(messageStore, messageStoreConfig, timerCheckpoint, timerMetrics, brokerStatsManager);
this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg));
this.messageStore.setTimerMessageStore(this.timerMessageStore);
}
} catch (IOException e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
}
return result;
}

public boolean initialize() throws CloneNotSupportedException {

boolean result = this.initializeMetadata();
if (!result) {
return false;
}

if (this.brokerConfig.isEnableControllerMode()) {
this.replicasManager.setFenced(true);
}


result = this.initializeMessageStore();
if (!result) {
return false;
}

return this.recoverAndInitService();
}

public boolean recoverAndInitService() throws CloneNotSupportedException {

boolean result = true;

if (messageStore != null) {
registerMessageStoreHook();
result = result && this.messageStore.load();
result = this.messageStore.load();
}

if (messageStoreConfig.isTimerWheelEnable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ public class PopAckConstants {
public static String buildClusterReviveTopic(String clusterName) {
return PopAckConstants.REVIVE_TOPIC + clusterName;
}

public static boolean isStartWithRevivePrefix(String topicName) {
return topicName != null && topicName.startsWith(REVIVE_TOPIC);
}
}
31 changes: 0 additions & 31 deletions distribution/conf/tieredstorage/brokerS3.conf

This file was deleted.

Loading

0 comments on commit 6eac107

Please sign in to comment.