Skip to content

Commit

Permalink
Fix alert_send_status may throw duplicate key exception and add limit…
Browse files Browse the repository at this point in the history
… for query alert (apache#11953)
  • Loading branch information
ruanwenjun authored and xdu-chenrj committed Sep 17, 2022
1 parent 9195b94 commit 5777ec4
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.dolphinscheduler.alert;

import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
Expand All @@ -34,20 +32,28 @@
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.google.common.collect.Lists;

@Service
public final class AlertSenderService extends Thread {

Expand All @@ -71,23 +77,29 @@ public synchronized void start() {

@Override
public void run() {
logger.info("alert sender started");
logger.info("Alert sender thread started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
if (CollectionUtils.isEmpty(alerts)) {
logger.debug("There is not waiting alerts");
continue;
}
AlertServerMetrics.registerPendingAlertGauge(alerts::size);
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
logger.error("Alert sender thread meet an exception", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
}
}
logger.info("Alert sender thread stopped");
}

public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
// get alert group from alert
int alertId = Optional.ofNullable(alert.getId()).orElse(0);
int alertId = alert.getId();
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
Expand All @@ -107,16 +119,23 @@ public void send(List<Alert> alerts) {
.build();

int sendSuccessCount = 0;
List<AlertSendStatus> alertSendStatuses = new ArrayList<>();
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus())
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId,
instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(alertId)
.alertPluginInstanceId(instance.getId())
.sendStatus(sendStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
alertSendStatuses.add(alertSendStatus);
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) {
sendSuccessCount++;
AlertServerMetrics.incAlertSuccessCount();
} else {
Expand All @@ -131,7 +150,11 @@ public void send(List<Alert> alerts) {
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
// we update the alert first to avoid duplicate key in alertSendStatus
// this may loss the alertSendStatus if the server restart
// todo: use transaction to update these two table
alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
alertDao.insertAlertSendStatus(alertSendStatuses);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testRun() {
String content = "alert mail test content";
List<Alert> alertList = new ArrayList<>();
Alert alert = new Alert();
alert.setId(1);
alert.setAlertGroupId(alertGroupId);
alert.setTitle(title);
alert.setContent(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;

import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -59,6 +60,8 @@
@Component
public class AlertDao {

private static final int QUERY_ALERT_THRESHOLD = 100;

@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;

Expand Down Expand Up @@ -136,12 +139,19 @@ public int addAlertSendStatus(AlertStatus sendStatus, String log, int alertId, i
return alertSendStatusMapper.insert(alertSendStatus);
}

public int insertAlertSendStatus(List<AlertSendStatus> alertSendStatuses) {
if (CollectionUtils.isEmpty(alertSendStatuses)) {
return 0;
}
return alertSendStatusMapper.batchInsert(alertSendStatuses);
}

/**
* MasterServer or WorkerServer stopped
*
* @param alertGroupId alertGroupId
* @param host host
* @param serverType serverType
* @param host host
* @param serverType serverType
*/
public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType)
Expand Down Expand Up @@ -253,9 +263,7 @@ public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance t
* List alerts that are pending for execution
*/
public List<Alert> listPendingAlerts() {
LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
.eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
return alertMapper.selectList(wrapper);
return alertMapper.listingAlertByStatus(AlertStatus.WAIT_EXECUTION.getCode(), QUERY_ALERT_THRESHOLD);
}

public List<Alert> listAlerts(int processInstanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_alert")
public class Alert {

/**
* primary key
*/
Expand Down Expand Up @@ -117,171 +126,4 @@ public class Alert {
@TableField(exist = false)
private Map<String, Object> info = new HashMap<>();

public Alert() {
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getSign() {
return sign;
}

public void setSign(String sign) {
this.sign = sign;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public AlertStatus getAlertStatus() {
return alertStatus;
}

public void setAlertStatus(AlertStatus alertStatus) {
this.alertStatus = alertStatus;
}

public String getLog() {
return log;
}

public void setLog(String log) {
this.log = log;
}

public Integer getAlertGroupId() {
return alertGroupId;
}

public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public Date getUpdateTime() {
return updateTime;
}

public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}

public Map<String, Object> getInfo() {
return info;
}

public void setInfo(Map<String, Object> info) {
this.info = info;
}

public WarningType getWarningType() {
return warningType;
}

public void setWarningType(WarningType warningType) {
this.warningType = warningType;
}

public Long getProjectCode() {
return projectCode;
}

public void setProjectCode(Long projectCode) {
this.projectCode = projectCode;
}

public Long getProcessDefinitionCode() {
return processDefinitionCode;
}

public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}

public Integer getProcessInstanceId() {
return processInstanceId;
}

public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}

public AlertType getAlertType() {
return alertType;
}

public void setAlertType(AlertType alertType) {
this.alertType = alertType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Alert alert = (Alert) o;
return Objects.equals(id, alert.id)
&& Objects.equals(alertGroupId, alert.alertGroupId)
&& Objects.equals(sign, alert.sign)
&& Objects.equals(title, alert.title)
&& Objects.equals(content, alert.content)
&& alertStatus == alert.alertStatus
&& warningType == alert.warningType
&& Objects.equals(log, alert.log)
&& Objects.equals(createTime, alert.createTime)
&& Objects.equals(updateTime, alert.updateTime)
&& Objects.equals(info, alert.info)
;
}

@Override
public int hashCode() {
return Objects.hash(id, sign, title, content, alertStatus, warningType, log, alertGroupId, createTime, updateTime, info);
}

@Override
public String toString() {
return "Alert{"
+ "id=" + id
+ ", sign='" + sign + '\''
+ ", title='" + title + '\''
+ ", content='" + content + '\''
+ ", alertStatus=" + alertStatus
+ ", warningType=" + warningType
+ ", log='" + log + '\''
+ ", alertGroupId=" + alertGroupId
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ ", info=" + info
+ '}';
}
}
Loading

0 comments on commit 5777ec4

Please sign in to comment.