Skip to content

Commit

Permalink
Reduce unnecessary switches
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Oct 16, 2024
1 parent aae30a7 commit bf0489a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re

/**
* diffResult: string
* checkStatus: int 0->error 1->checkSuccess 2->checkFalse
* checkStatus: int 0->ready, 1->notReady, 2->error
*/
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
Expand All @@ -485,7 +485,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
HashMap<String, String> resultMap = new HashMap<>();
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
resultMap.put("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid");
resultMap.put("checkStatus", "0");
resultMap.put("checkStatus", "2");
response.setBody(JSON.toJSONBytes(resultMap));
return response;
}
Expand All @@ -496,7 +496,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
if (StringUtils.isNotBlank(requestTopic)) {
boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
resultMap.put("diffResult", diffResult.toString());
resultMap.put("checkStatus", checkResult ? "1" : "2");
resultMap.put("checkStatus", checkResult ? "0" : "1");
response.setBody(JSON.toJSONBytes(resultMap));
return response;
}
Expand All @@ -507,13 +507,13 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
}
diffResult.append("check all topic successful, size:").append(cqTable.size());
resultMap.put("diffResult", diffResult.toString());
resultMap.put("checkStatus", checkResult ? "1" : "2");
resultMap.put("checkStatus", checkResult ? "0" : "1");
response.setBody(JSON.toJSONBytes(resultMap));

} catch (Exception e) {
LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e);
resultMap.put("diffResult", e.getMessage());
resultMap.put("checkStatus", "0");
resultMap.put("checkStatus", "2");
response.setBody(JSON.toJSONBytes(resultMap));
}
return response;
Expand All @@ -535,7 +535,7 @@ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueue
// The latest message is earlier than the check time
Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(maxFileOffsetInQueue);
if (fileLatestCq != null) {
if (fileLatestCq.getObject2() < System.currentTimeMillis() - checkStoreTime) {
if (fileLatestCq.getObject2() < checkStoreTime) {
continue;
}
}
Expand All @@ -549,7 +549,7 @@ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueue
break;
}
Long earliestKcTime = kvCqUnit.getObject2();
if (earliestKcTime < System.currentTimeMillis() - checkStoreTime) {
if (earliestKcTime < checkStoreTime) {
continue;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : "";
String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "";
// The default check is 30 days
long checkStoreTime = commandLine.hasOption("checkFrom") ? Long.parseLong(commandLine.getOptionValue("checkFrom").trim()) : 24 * 60 * 60 * 1000 * 30L;
long checkStoreTime = commandLine.hasOption("checkFrom") ? Long.parseLong(commandLine.getOptionValue("checkFrom").trim()) : System.currentTimeMillis() - 24 * 60 * 60 * 1000 * 30L;

try {
defaultMQAdminExt.start();
Expand All @@ -88,10 +88,10 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic, checkStoreTime);
if (StringUtils.isNotBlank(topic)) {
System.out.print("checkStatus(0 -> error, 1 -> checkSuccess, 2 -> checkFalse): " + body.getCheckStatus() + " \n " + body.getDiffResult());
System.out.print("checkStatus(0 -> ready, 1 -> notReady, 2 -> error): " + body.getCheckStatus() + " \n " + body.getDiffResult() + "\n");
} else {
System.out.print("checkStatus(0 -> error, 1 -> checkSuccess, 2 -> checkFalse): " + body.getCheckStatus() + " \n " +
brokerName + " \n" + body.getDiffResult());
System.out.print("checkStatus(0 -> ready, 1 -> notReady, 2 -> error): " + body.getCheckStatus() + " \n " +
brokerName + " \n" + body.getDiffResult() + "\n");
}
}

Expand Down

0 comments on commit bf0489a

Please sign in to comment.