Skip to content

Commit

Permalink
[Improve] Refactor the notification sending logic. (#2180)
Browse files Browse the repository at this point in the history
Co-authored-by: tomsun28 <[email protected]>
  • Loading branch information
crossoverJie and tomsun28 authored Jul 4, 2024
1 parent 6222c51 commit 0699e89
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
public class AlerterWorkerPool {

private ThreadPoolExecutor workerExecutor;
private ThreadPoolExecutor notifyExecutor;

public AlerterWorkerPool() {
initWorkExecutor();
initNotifyExecutor();
}

private void initWorkExecutor() {
Expand All @@ -57,6 +59,24 @@ private void initWorkExecutor() {
new ThreadPoolExecutor.AbortPolicy());
}

private void initNotifyExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("notifyExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("notify-worker-%d")
.build();
notifyExecutor = new ThreadPoolExecutor(6,
10,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

/**
* Run the alerter task
* @param runnable task
Expand All @@ -65,4 +85,15 @@ private void initWorkExecutor() {
public void executeJob(Runnable runnable) throws RejectedExecutionException {
workerExecutor.execute(runnable);
}

/**
* Executes the given runnable task using the notifyExecutor.
*
* @param runnable the task to be executed
* @throws RejectedExecutionException if the task cannot be accepted for execution
*/
public void executeNotify(Runnable runnable) throws RejectedExecutionException {
notifyExecutor.execute(runnable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
Expand Down Expand Up @@ -85,7 +86,15 @@ public boolean sendNoticeMsg(NoticeReceiver receiver, NoticeTemplate noticeTempl
}
byte type = receiver.getType();
if (alertNotifyHandlerMap.containsKey(type)) {
alertNotifyHandlerMap.get(type).send(receiver, noticeTemplate, alert);
AlertNotifyHandler alertNotifyHandler = alertNotifyHandlerMap.get(type);
if (noticeTemplate == null) {
noticeTemplate = noticeConfigService.getDefaultNoticeTemplateByType(alertNotifyHandler.type());
}
if (noticeTemplate == null) {
log.error("alert does not have mapping default notice template. type: {}.", alertNotifyHandler.type());
throw new NullPointerException(alertNotifyHandler.type() + " does not have mapping default notice template");
}
alertNotifyHandler.send(receiver, noticeTemplate, alert);
return true;
}
return false;
Expand All @@ -96,11 +105,14 @@ private NoticeReceiver getOneReceiverById(Long id) {
}

private NoticeTemplate getOneTemplateById(Long id) {
if (id == null) {
return null;
}
return noticeConfigService.getOneTemplateById(id);
}

private List<NoticeRule> matchNoticeRulesByAlert(Alert alert) {
return noticeConfigService.getReceiverFilterRule(alert);
private Optional<List<NoticeRule>> matchNoticeRulesByAlert(Alert alert) {
return Optional.ofNullable(noticeConfigService.getReceiverFilterRule(alert));
}

private class DispatchTask implements Runnable {
Expand Down Expand Up @@ -131,29 +143,21 @@ public void run() {
}

private void sendNotify(Alert alert) {
List<NoticeRule> noticeRules = matchNoticeRulesByAlert(alert);
// todo Send notification here temporarily single thread
if (noticeRules != null) {
for (NoticeRule rule : noticeRules) {
try {
if (rule.getTemplateId() == null) {
List<Long> receiverIdList = rule.getReceiverId();
for (Long receiverId : receiverIdList) {
sendNoticeMsg(getOneReceiverById(receiverId),
null, alert);
}
} else {
List<Long> receiverIdList = rule.getReceiverId();
for (Long receiverId : receiverIdList) {
sendNoticeMsg(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert);
}
}
} catch (AlertNoticeException e) {
log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage());
}
}
}
matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> {
noticeRules.forEach(rule -> {
workerPool.executeNotify(() -> {
rule.getReceiverId()
.forEach(receiverId -> {
try {
sendNoticeMsg(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert);
} catch (AlertNoticeException e) {
log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage());
}
});
});
});
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ protected String renderContent(NoticeTemplate noticeTemplate, Alert alert) throw
model.put("content", alert.getContent());
model.put("tagsLabel", bundle.getString("alerter.notify.tags"));
model.put("tags", alert.getTags());
if (noticeTemplate == null) {
noticeTemplate = noticeConfigService.getDefaultNoticeTemplateByType(type());
}
if (noticeTemplate == null) {
log.error("alert does not have mapping default notice template. type: {}.", type());
throw new NullPointerException(type() + " does not have mapping default notice template");
}
// Single instance reuse cache considers mulitple-threading issues
String templateName = "freeMakerTemplate";
stringLoader.putTemplate(templateName, noticeTemplate.getContent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ public String buildAlertHtmlTemplate(final Alert alert, NoticeTemplate noticeTem
model.put("consoleUrl", alerterProperties.getConsoleUrl());
model.put("nameContent", bundle.getString("alerter.notify.content"));
model.put("content", alert.getContent());
if (noticeTemplate == null) {
noticeTemplate = noticeConfigService.getDefaultNoticeTemplateByType((byte) 1);
}
if (noticeTemplate == null) {
throw new NullPointerException("email does not have mapping default notice template");
}
StringTemplateLoader stringLoader = new StringTemplateLoader();
String templateName = "mailTemplate";
stringLoader.putTemplate(templateName, noticeTemplate.getContent());
Expand Down

0 comments on commit 0699e89

Please sign in to comment.