Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alert_send_status may throw duplicate key exception and add limit for query alert #11953

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.dolphinscheduler.alert;

import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
Expand All @@ -34,20 +32,28 @@
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.google.common.collect.Lists;

@Service
public final class AlertSenderService extends Thread {

Expand All @@ -71,23 +77,29 @@ public synchronized void start() {

@Override
public void run() {
logger.info("alert sender started");
logger.info("Alert sender thread started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
if (CollectionUtils.isEmpty(alerts)) {
logger.debug("There is not waiting alerts");
continue;
}
AlertServerMetrics.registerPendingAlertGauge(alerts::size);
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
logger.error("Alert sender thread meet an exception", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
}
}
logger.info("Alert sender thread stopped");
}

public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
// get alert group from alert
int alertId = Optional.ofNullable(alert.getId()).orElse(0);
int alertId = alert.getId();
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
Expand All @@ -107,16 +119,23 @@ public void send(List<Alert> alerts) {
.build();

int sendSuccessCount = 0;
List<AlertSendStatus> alertSendStatuses = new ArrayList<>();
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus())
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId,
instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(alertId)
.alertPluginInstanceId(instance.getId())
.sendStatus(sendStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
alertSendStatuses.add(alertSendStatus);
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) {
sendSuccessCount++;
AlertServerMetrics.incAlertSuccessCount();
} else {
Expand All @@ -131,7 +150,11 @@ public void send(List<Alert> alerts) {
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
// we update the alert first to avoid duplicate key in alertSendStatus
// this may loss the alertSendStatus if the server restart
// todo: use transaction to update these two table
alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
alertDao.insertAlertSendStatus(alertSendStatuses);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testRun() {
String content = "alert mail test content";
List<Alert> alertList = new ArrayList<>();
Alert alert = new Alert();
alert.setId(1);
alert.setAlertGroupId(alertGroupId);
alert.setTitle(title);
alert.setContent(content);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;

import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -59,6 +60,8 @@
@Component
public class AlertDao {

private static final int QUERY_ALERT_THRESHOLD = 100;

@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;

Expand Down Expand Up @@ -136,12 +139,19 @@ public int addAlertSendStatus(AlertStatus sendStatus, String log, int alertId, i
return alertSendStatusMapper.insert(alertSendStatus);
}

public int insertAlertSendStatus(List<AlertSendStatus> alertSendStatuses) {
if (CollectionUtils.isEmpty(alertSendStatuses)) {
return 0;
}
return alertSendStatusMapper.batchInsert(alertSendStatuses);
}

/**
* MasterServer or WorkerServer stopped
*
* @param alertGroupId alertGroupId
* @param host host
* @param serverType serverType
* @param host host
* @param serverType serverType
*/
public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType)
Expand Down Expand Up @@ -253,9 +263,7 @@ public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance t
* List alerts that are pending for execution
*/
public List<Alert> listPendingAlerts() {
LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
.eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
return alertMapper.selectList(wrapper);
return alertMapper.listingAlertByStatus(AlertStatus.WAIT_EXECUTION.getCode(), QUERY_ALERT_THRESHOLD);
}

public List<Alert> listAlerts(int processInstanceId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_alert")
public class Alert {

/**
* primary key
*/
Expand Down Expand Up @@ -117,171 +126,4 @@ public class Alert {
@TableField(exist = false)
private Map<String, Object> info = new HashMap<>();

public Alert() {
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public String getSign() {
return sign;
}

public void setSign(String sign) {
this.sign = sign;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public AlertStatus getAlertStatus() {
return alertStatus;
}

public void setAlertStatus(AlertStatus alertStatus) {
this.alertStatus = alertStatus;
}

public String getLog() {
return log;
}

public void setLog(String log) {
this.log = log;
}

public Integer getAlertGroupId() {
return alertGroupId;
}

public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public Date getUpdateTime() {
return updateTime;
}

public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}

public Map<String, Object> getInfo() {
return info;
}

public void setInfo(Map<String, Object> info) {
this.info = info;
}

public WarningType getWarningType() {
return warningType;
}

public void setWarningType(WarningType warningType) {
this.warningType = warningType;
}

public Long getProjectCode() {
return projectCode;
}

public void setProjectCode(Long projectCode) {
this.projectCode = projectCode;
}

public Long getProcessDefinitionCode() {
return processDefinitionCode;
}

public void setProcessDefinitionCode(Long processDefinitionCode) {
this.processDefinitionCode = processDefinitionCode;
}

public Integer getProcessInstanceId() {
return processInstanceId;
}

public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}

public AlertType getAlertType() {
return alertType;
}

public void setAlertType(AlertType alertType) {
this.alertType = alertType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Alert alert = (Alert) o;
return Objects.equals(id, alert.id)
&& Objects.equals(alertGroupId, alert.alertGroupId)
&& Objects.equals(sign, alert.sign)
&& Objects.equals(title, alert.title)
&& Objects.equals(content, alert.content)
&& alertStatus == alert.alertStatus
&& warningType == alert.warningType
&& Objects.equals(log, alert.log)
&& Objects.equals(createTime, alert.createTime)
&& Objects.equals(updateTime, alert.updateTime)
&& Objects.equals(info, alert.info)
;
}

@Override
public int hashCode() {
return Objects.hash(id, sign, title, content, alertStatus, warningType, log, alertGroupId, createTime, updateTime, info);
}

@Override
public String toString() {
return "Alert{"
+ "id=" + id
+ ", sign='" + sign + '\''
+ ", title='" + title + '\''
+ ", content='" + content + '\''
+ ", alertStatus=" + alertStatus
+ ", warningType=" + warningType
+ ", log='" + log + '\''
+ ", alertGroupId=" + alertGroupId
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ ", info=" + info
+ '}';
}
}
Loading