Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Refactor the notification sending logic. #2180

Merged
merged 14 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
public class AlerterWorkerPool {

private ThreadPoolExecutor workerExecutor;
private ThreadPoolExecutor notifyExecutor;

public AlerterWorkerPool() {
initWorkExecutor();
initNotifyExecutor();
}

private void initWorkExecutor() {
Expand All @@ -57,6 +59,24 @@ private void initWorkExecutor() {
new ThreadPoolExecutor.AbortPolicy());
}

private void initNotifyExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("notifyExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("notify-worker-%d")
.build();
notifyExecutor = new ThreadPoolExecutor(6,
10,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

/**
* Run the alerter task
* @param runnable task
Expand All @@ -65,4 +85,15 @@ private void initWorkExecutor() {
public void executeJob(Runnable runnable) throws RejectedExecutionException {
workerExecutor.execute(runnable);
}

/**
* Executes the given runnable task using the notifyExecutor.
*
* @param runnable the task to be executed
* @throws RejectedExecutionException if the task cannot be accepted for execution
*/
public void executeNotify(Runnable runnable) throws RejectedExecutionException {
notifyExecutor.execute(runnable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
Expand Down Expand Up @@ -99,8 +100,8 @@ private NoticeTemplate getOneTemplateById(Long id) {
return noticeConfigService.getOneTemplateById(id);
}

private List<NoticeRule> matchNoticeRulesByAlert(Alert alert) {
return noticeConfigService.getReceiverFilterRule(alert);
private Optional<List<NoticeRule>> matchNoticeRulesByAlert(Alert alert) {
return Optional.ofNullable(noticeConfigService.getReceiverFilterRule(alert));
}

private class DispatchTask implements Runnable {
Expand Down Expand Up @@ -131,29 +132,21 @@ public void run() {
}

private void sendNotify(Alert alert) {
List<NoticeRule> noticeRules = matchNoticeRulesByAlert(alert);
// todo Send notification here temporarily single thread
if (noticeRules != null) {
for (NoticeRule rule : noticeRules) {
try {
if (rule.getTemplateId() == null) {
List<Long> receiverIdList = rule.getReceiverId();
for (Long receiverId : receiverIdList) {
sendNoticeMsg(getOneReceiverById(receiverId),
null, alert);
}
} else {
List<Long> receiverIdList = rule.getReceiverId();
for (Long receiverId : receiverIdList) {
sendNoticeMsg(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert);
}
}
} catch (AlertNoticeException e) {
log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage());
}
}
}
matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> {
noticeRules.forEach(rule -> {
workerPool.executeNotify(() -> {
rule.getReceiverId().stream().filter(receiverId -> rule.getTemplateId() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, the code logic here is to filter out cases where the template id is null, but before the template id was null it would go to use the default file notification template.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder. I adjusted the logic;
I moved the template query earlier so it can return exceptions faster, and the code can be more concise.

.forEach(receiverId -> {
try {
sendNoticeMsg(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert);
} catch (AlertNoticeException e) {
log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage());
}
});
});
});
});
}
}

Expand Down
Loading