From 866a3115eb7cbadc29e917d7292e30dcf2c86656 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 15 Sep 2022 10:52:21 +0800 Subject: [PATCH] Fix alert_send_status may throw duplicate key exception and add limit for query alert --- .../alert/AlertSenderService.java | 51 +++-- .../alert/runner/AlertSenderServiceTest.java | 1 + .../apache/dolphinscheduler/dao/AlertDao.java | 18 +- .../dolphinscheduler/dao/entity/Alert.java | 178 +----------------- .../dao/entity/AlertSendStatus.java | 23 +-- .../dao/mapper/AlertMapper.java | 9 +- .../dao/mapper/AlertSendStatusMapper.java | 4 + .../dao/mapper/AlertMapper.xml | 33 +++- .../dao/mapper/AlertSendStatusMapper.xml | 35 ++++ 9 files changed, 143 insertions(+), 209 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java index 92b3cea45be2..dcd83995efc3 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.alert; -import com.google.common.collect.Lists; -import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertConstants; import org.apache.dolphinscheduler.alert.api.AlertData; @@ -34,20 +32,28 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; -import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; + import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Lists; + @Service public final class AlertSenderService extends Thread { @@ -71,23 +77,29 @@ public synchronized void start() { @Override public void run() { - logger.info("alert sender started"); + logger.info("Alert sender thread started"); while (!ServerLifeCycleManager.isStopped()) { try { List alerts = alertDao.listPendingAlerts(); + if (CollectionUtils.isEmpty(alerts)) { + logger.debug("There is not waiting alerts"); + continue; + } AlertServerMetrics.registerPendingAlertGauge(alerts::size); this.send(alerts); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L); } catch (Exception e) { - logger.error("alert sender thread error", e); + logger.error("Alert sender thread meet an exception", e); + } finally { + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L); } } + logger.info("Alert sender thread stopped"); } public void send(List alerts) { for (Alert alert : alerts) { // get alert group from alert - int alertId = Optional.ofNullable(alert.getId()).orElse(0); + int alertId = alert.getId(); int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0); List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); if (CollectionUtils.isEmpty(alertInstanceList)) { @@ -107,16 +119,23 @@ public void send(List alerts) { .build(); int sendSuccessCount = 0; + List alertSendStatuses = new ArrayList<>(); List alertResults = new ArrayList<>(); for (AlertPluginInstance instance : alertInstanceList) { AlertResult alertResult = this.alertResultHandler(instance, alertData); if (alertResult != null) { - AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) + AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus()) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; - alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId, - instance.getId()); - if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) { + AlertSendStatus alertSendStatus = AlertSendStatus.builder() + .alertId(alertId) + .alertPluginInstanceId(instance.getId()) + .sendStatus(sendStatus) + .log(JSONUtils.toJsonString(alertResult)) + .createTime(new Date()) + .build(); + alertSendStatuses.add(alertSendStatus); + if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) { sendSuccessCount++; AlertServerMetrics.incAlertSuccessCount(); } else { @@ -131,7 +150,11 @@ public void send(List alerts) { } else if (sendSuccessCount < alertInstanceList.size()) { alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS; } + // we update the alert first to avoid duplicate key in alertSendStatus + // this may loss the alertSendStatus if the server restart + // todo: use transaction to update these two table alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId); + alertDao.insertAlertSendStatus(alertSendStatuses); } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java index 0dcea24b311b..095cf2f758b2 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java @@ -154,6 +154,7 @@ public void testRun() { String content = "alert mail test content"; List alertList = new ArrayList<>(); Alert alert = new Alert(); + alert.setId(1); alert.setAlertGroupId(alertGroupId); alert.setTitle(title); alert.setContent(content); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index d8a6730fb655..169aa7aab98a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections4.CollectionUtils; import java.time.LocalDateTime; import java.time.ZoneId; @@ -59,6 +60,8 @@ @Component public class AlertDao { + private static final int QUERY_ALERT_THRESHOLD = 100; + @Value("${alert.alarm-suppression.crash:60}") private Integer crashAlarmSuppression; @@ -136,12 +139,19 @@ public int addAlertSendStatus(AlertStatus sendStatus, String log, int alertId, i return alertSendStatusMapper.insert(alertSendStatus); } + public int insertAlertSendStatus(List alertSendStatuses) { + if (CollectionUtils.isEmpty(alertSendStatuses)) { + return 0; + } + return alertSendStatusMapper.batchInsert(alertSendStatuses); + } + /** * MasterServer or WorkerServer stopped * * @param alertGroupId alertGroupId - * @param host host - * @param serverType serverType + * @param host host + * @param serverType serverType */ public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) { ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType) @@ -253,9 +263,7 @@ public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance t * List alerts that are pending for execution */ public List listPendingAlerts() { - LambdaQueryWrapper wrapper = new QueryWrapper<>(new Alert()).lambda() - .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION); - return alertMapper.selectList(wrapper); + return alertMapper.listingAlertByStatus(AlertStatus.WAIT_EXECUTION.getCode(), QUERY_ALERT_THRESHOLD); } public List listAlerts(int processInstanceId) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java index 4b27a55b5a4a..421f3f76b700 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java @@ -24,15 +24,24 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.Objects; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor @TableName("t_ds_alert") public class Alert { + /** * primary key */ @@ -117,171 +126,4 @@ public class Alert { @TableField(exist = false) private Map info = new HashMap<>(); - public Alert() { - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } - - public String getSign() { - return sign; - } - - public void setSign(String sign) { - this.sign = sign; - } - - public String getTitle() { - return title; - } - - public void setTitle(String title) { - this.title = title; - } - - public String getContent() { - return content; - } - - public void setContent(String content) { - this.content = content; - } - - public AlertStatus getAlertStatus() { - return alertStatus; - } - - public void setAlertStatus(AlertStatus alertStatus) { - this.alertStatus = alertStatus; - } - - public String getLog() { - return log; - } - - public void setLog(String log) { - this.log = log; - } - - public Integer getAlertGroupId() { - return alertGroupId; - } - - public void setAlertGroupId(Integer alertGroupId) { - this.alertGroupId = alertGroupId; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getUpdateTime() { - return updateTime; - } - - public void setUpdateTime(Date updateTime) { - this.updateTime = updateTime; - } - - public Map getInfo() { - return info; - } - - public void setInfo(Map info) { - this.info = info; - } - - public WarningType getWarningType() { - return warningType; - } - - public void setWarningType(WarningType warningType) { - this.warningType = warningType; - } - - public Long getProjectCode() { - return projectCode; - } - - public void setProjectCode(Long projectCode) { - this.projectCode = projectCode; - } - - public Long getProcessDefinitionCode() { - return processDefinitionCode; - } - - public void setProcessDefinitionCode(Long processDefinitionCode) { - this.processDefinitionCode = processDefinitionCode; - } - - public Integer getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(Integer processInstanceId) { - this.processInstanceId = processInstanceId; - } - - public AlertType getAlertType() { - return alertType; - } - - public void setAlertType(AlertType alertType) { - this.alertType = alertType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Alert alert = (Alert) o; - return Objects.equals(id, alert.id) - && Objects.equals(alertGroupId, alert.alertGroupId) - && Objects.equals(sign, alert.sign) - && Objects.equals(title, alert.title) - && Objects.equals(content, alert.content) - && alertStatus == alert.alertStatus - && warningType == alert.warningType - && Objects.equals(log, alert.log) - && Objects.equals(createTime, alert.createTime) - && Objects.equals(updateTime, alert.updateTime) - && Objects.equals(info, alert.info) - ; - } - - @Override - public int hashCode() { - return Objects.hash(id, sign, title, content, alertStatus, warningType, log, alertGroupId, createTime, updateTime, info); - } - - @Override - public String toString() { - return "Alert{" - + "id=" + id - + ", sign='" + sign + '\'' - + ", title='" + title + '\'' - + ", content='" + content + '\'' - + ", alertStatus=" + alertStatus - + ", warningType=" + warningType - + ", log='" + log + '\'' - + ", alertGroupId=" + alertGroupId - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + ", info=" + info - + '}'; - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AlertSendStatus.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AlertSendStatus.java index 32a291104a0a..77418030258f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AlertSendStatus.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/AlertSendStatus.java @@ -21,15 +21,20 @@ import java.util.Date; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.google.common.base.Objects; @Data +@Builder +@AllArgsConstructor +@NoArgsConstructor @TableName("t_ds_alert_send_status") public class AlertSendStatus { @@ -69,20 +74,4 @@ public class AlertSendStatus { @TableField("create_time") private Date createTime; - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AlertSendStatus that = (AlertSendStatus) o; - return alertId == that.alertId && alertPluginInstanceId == that.alertPluginInstanceId; - } - - @Override - public int hashCode() { - return Objects.hashCode(alertId, alertPluginInstanceId); - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java index 1b6f29d43cbe..d5563be5187e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java @@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Param; import java.util.Date; +import java.util.List; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -32,10 +33,16 @@ @Mapper public interface AlertMapper extends BaseMapper { + /** + * Query the alert by alertStatus and return limit with default sort. + */ + List listingAlertByStatus(@Param("alertStatus") int alertStatus, @Param("limit") int limit); + /** * Insert server crash alert *

This method will ensure that there is at most one unsent alert which has the same content in the database. */ - void insertAlertWhenServerCrash(@Param("alert") Alert alert, @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); + void insertAlertWhenServerCrash(@Param("alert") Alert alert, + @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java index 62be65642383..b1a5fea153ff 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.java @@ -19,7 +19,11 @@ import org.apache.dolphinscheduler.dao.entity.AlertSendStatus; +import java.util.List; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; public interface AlertSendStatusMapper extends BaseMapper { + + int batchInsert(List alertSendStatuses); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml index 1e83b4965b78..274c88cf02b0 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml @@ -18,13 +18,38 @@ + + id + , sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time, project_code, + process_definition_code, process_instance_id, alert_type + + - insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time, alert_type) - SELECT #{alert.sign}, #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.warningType.code}, - #{alert.log}, #{alert.alertGroupId}, #{alert.createTime}, #{alert.updateTime}, #{alert.alertType.code} + insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, + update_time, alert_type) + SELECT #{alert.sign}, + #{alert.title}, + #{alert.content}, + #{alert.alertStatus.code}, + #{alert.warningType.code}, + #{alert.log}, + #{alert.alertGroupId}, + #{alert.createTime}, + #{alert.updateTime}, + #{alert.alertType.code} from t_ds_alert - where create_time >= #{crashAlarmSuppressionStartTime} and sign = #{alert.sign} and alert_status = #{alert.alertStatus.code} + where create_time >= #{crashAlarmSuppressionStartTime} + and sign = #{alert.sign} + and alert_status = #{alert.alertStatus.code} having count(*) = 0 + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml new file mode 100644 index 000000000000..248d7b05731f --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertSendStatusMapper.xml @@ -0,0 +1,35 @@ + + + + + + + + insert into t_ds_alert_send_status (alert_id, alert_plugin_instance_id, send_status, log, create_time) + values + + #{alertSendStatus.alertId}, + #{alertSendStatus.alertPluginInstanceId}, + #{alertSendStatus.sendStatus}, + #{alertSendStatus.log}, + #{alertSendStatus.createTime}, + + + + +