diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlarmHandler.java b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlarmHandler.java new file mode 100644 index 00000000000..447e2991743 --- /dev/null +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlarmHandler.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.alert.calculate; + +import jakarta.persistence.criteria.Predicate; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.alert.AlerterWorkerPool; +import org.apache.hertzbeat.alert.dao.AlertCollectorDao; +import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce; +import org.apache.hertzbeat.alert.service.AlertService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.alerter.Alert; +import org.apache.hertzbeat.common.entity.manager.Collector; +import org.apache.hertzbeat.common.support.event.CollectorDeletedEvent; +import org.apache.hertzbeat.common.support.event.SystemConfigChangeEvent; +import org.apache.hertzbeat.common.util.ResourceBundleUtil; +import org.springframework.context.event.EventListener; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.ResourceBundle; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hertzbeat.common.constants.CommonConstants.ALERT_STATUS_CODE_PENDING; + +/** + * handle collector alarm + */ +@Component +@Slf4j +public class CollectorAlarmHandler { + + private final Map offlineAlertMap; + + private final AlertService alertService; + + private final AlertCollectorDao alertCollectorDao; + + private final AlarmCommonReduce alarmCommonReduce; + + private final AlerterWorkerPool workerPool; + + private ResourceBundle bundle; + + public CollectorAlarmHandler(AlarmCommonReduce alarmCommonReduce, AlertService alertService, AlertCollectorDao alertCollectorDao, + AlerterWorkerPool workerPool) { + this.offlineAlertMap = new ConcurrentHashMap<>(16); + this.alarmCommonReduce = alarmCommonReduce; + this.alertService = alertService; + this.alertCollectorDao = alertCollectorDao; + this.workerPool = workerPool; + this.bundle = ResourceBundleUtil.getBundle("alerter"); + List collectors = this.alertCollectorDao.findCollectorsByStatus(CommonConstants.COLLECTOR_STATUS_OFFLINE); + if (!CollectionUtils.isEmpty(collectors)) { + for (Collector collector : collectors) { + Map tags = new HashMap<>(8); + tags.put(CommonConstants.TAG_COLLECTOR_ID, String.valueOf(collector.getId())); + tags.put(CommonConstants.TAG_COLLECTOR_NAME, collector.getName()); + this.offlineAlertMap.put(collector.getName(), + Alert.builder().tags(tags).target(CommonConstants.AVAILABILITY).status(ALERT_STATUS_CODE_PENDING).build()); + } + } + } + + /** + * handle collector online + * + * @param identity collector name + */ + public void online(final String identity) { + Collector collector = alertCollectorDao.findCollectorByName(identity); + if (collector == null) { + return; + } + long currentTimeMill = System.currentTimeMillis(); + Alert preAlert = offlineAlertMap.remove(identity); + if (preAlert != null) { + Map tags = preAlert.getTags(); + tags.put(CommonConstants.TAG_COLLECTOR_HOST, collector.getIp()); + tags.put(CommonConstants.TAG_COLLECTOR_VERSION, collector.getVersion()); + String content = this.bundle.getString("alerter.availability.collector.recover"); + Alert resumeAlert = Alert.builder() + .tags(tags) + .target(CommonConstants.AVAILABILITY_COLLECTOR) + .content(content) + .priority(CommonConstants.ALERT_PRIORITY_CODE_WARNING) + .status(CommonConstants.ALERT_STATUS_CODE_RESTORED) + .firstAlarmTime(currentTimeMill) + .lastAlarmTime(preAlert.getLastAlarmTime()) + .build(); + workerPool.executeJob(() -> recoverAlarm(identity, resumeAlert)); + } + } + + private void recoverAlarm(String identity, Alert restoreAlert) { + List alertIds = queryAvailabilityAlerts(identity, restoreAlert) + .stream() + .filter(alert -> Objects.equals(alert.getTags().get(CommonConstants.TAG_COLLECTOR_NAME), identity)) + .map(Alert::getId) + .toList(); + + if (!alertIds.isEmpty()) { + alertService.editAlertStatus(CommonConstants.ALERT_STATUS_CODE_SOLVED, alertIds); + + // Recovery notifications are generated only after an alarm has occurred + alarmCommonReduce.reduceAndSendAlarm(restoreAlert); + } + } + + private List queryAvailabilityAlerts(String identity, Alert restoreAlert) { + //create query condition + Specification specification = (root, query, criteriaBuilder) -> { + List andList = new ArrayList<>(); + + Predicate predicateTags = criteriaBuilder.like(root.get("tags").as(String.class), "%" + identity + "%"); + andList.add(predicateTags); + + Predicate predicatePriority = criteriaBuilder.equal(root.get("priority"), CommonConstants.ALERT_PRIORITY_CODE_EMERGENCY); + andList.add(predicatePriority); + + Predicate predicateStatus = criteriaBuilder.equal(root.get("status"), ALERT_STATUS_CODE_PENDING); + andList.add(predicateStatus); + + Predicate predicateAlertTime = criteriaBuilder.lessThanOrEqualTo(root.get("lastAlarmTime"), restoreAlert.getLastAlarmTime()); + andList.add(predicateAlertTime); + + Predicate[] predicates = new Predicate[andList.size()]; + return criteriaBuilder.and(andList.toArray(predicates)); + }; + + //query results + return alertService.getAlerts(specification); + } + + /** + * handle collector offline + * + * @param identity collector name + */ + public void offline(final String identity) { + Collector collector = alertCollectorDao.findCollectorByName(identity); + if (collector == null) { + return; + } + long currentTimeMill = System.currentTimeMillis(); + + Alert preAlert = offlineAlertMap.get(identity); + if (preAlert == null) { + Map tags = new HashMap<>(); + tags.put(CommonConstants.TAG_COLLECTOR_ID, String.valueOf(collector.getId())); + tags.put(CommonConstants.TAG_COLLECTOR_NAME, collector.getName()); + tags.put(CommonConstants.TAG_COLLECTOR_HOST, collector.getIp()); + tags.put(CommonConstants.TAG_COLLECTOR_VERSION, collector.getVersion()); + tags.put(CommonConstants.TAG_CODE, "OFFLINE"); + + String content = this.bundle.getString("alerter.availability.collector.offline"); + Alert alert = Alert.builder() + .tags(tags) + .priority(CommonConstants.ALERT_PRIORITY_CODE_EMERGENCY) + .status(CommonConstants.ALERT_STATUS_CODE_PENDING) + .target(CommonConstants.AVAILABILITY_COLLECTOR) + .content(content) + .firstAlarmTime(currentTimeMill) + .lastAlarmTime(currentTimeMill) + .times(1) + .build(); + this.offlineAlertMap.put(identity, alert); + alarmCommonReduce.reduceAndSendAlarm(alert); + } + } + + + @EventListener(SystemConfigChangeEvent.class) + public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) { + log.info("calculate alarm receive system config change event: {}.", event.getSource()); + this.bundle = ResourceBundleUtil.getBundle("alerter"); + } + + @EventListener(CollectorDeletedEvent.class) + public void onCollectorDeletedEvent(CollectorDeletedEvent event) { + log.info("collector alarm handler receive collector {} has been deleted.", event.getIdentity()); + offlineAlertMap.remove(event.getIdentity()); + } +} diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java b/alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java new file mode 100644 index 00000000000..f9fda0d189c --- /dev/null +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.alert.dao; + +import org.apache.hertzbeat.common.entity.manager.Collector; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; + +import java.util.List; + +/** + * Alert Collector Dao + */ +public interface AlertCollectorDao extends JpaRepository, JpaSpecificationExecutor { + + /** + * Query the collector in the specified state + * @param status status value + * @return collector list + */ + List findCollectorsByStatus(Byte status); + + /** + * Query collector by name + * @param name collector name + * @return collector + */ + Collector findCollectorByName(String name); +} diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/reduce/AlarmCommonReduce.java b/alerter/src/main/java/org/apache/hertzbeat/alert/reduce/AlarmCommonReduce.java index a2f7b8d2895..6f958323116 100644 --- a/alerter/src/main/java/org/apache/hertzbeat/alert/reduce/AlarmCommonReduce.java +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/reduce/AlarmCommonReduce.java @@ -53,9 +53,7 @@ public void reduceAndSendAlarm(Alert alert) { alert.setTags(tags); } String monitorIdStr = tags.get(CommonConstants.TAG_MONITOR_ID); - if (monitorIdStr == null) { - log.debug("receiver extern alarm message: {}", alert); - } else { + if (monitorIdStr != null){ long monitorId = Long.parseLong(monitorIdStr); List tagList = alertMonitorDao.findMonitorIdBindTags(monitorId); for (Tag tag : tagList) { @@ -63,6 +61,8 @@ public void reduceAndSendAlarm(Alert alert) { tags.put(tag.getName(), tag.getTagValue()); } } + } else if (tags.get(CommonConstants.TAG_COLLECTOR_NAME) == null){ + log.debug("receiver extern alarm message: {}", alert); } // converge -> silence if (alarmConvergeReduce.filterConverge(alert) && alarmSilenceReduce.filterSilence(alert)) { diff --git a/alerter/src/main/resources/alerter_en_US.properties b/alerter/src/main/resources/alerter_en_US.properties index 938da98ac31..e05324b2d02 100644 --- a/alerter/src/main/resources/alerter_en_US.properties +++ b/alerter/src/main/resources/alerter_en_US.properties @@ -14,6 +14,8 @@ # limitations under the License. alerter.availability.recover = Availability Alert Resolved, Monitor Status Normal Now +alerter.availability.collector.recover = Collector Availability Alert Resolved, The collector is online +alerter.availability.collector.offline = Collector Availability Alert Notify, The collector is offline alerter.alarm.recover = Alert Resolved Notice alerter.notify.title = HertzBeat Alert Notify alerter.notify.target = Monitor Target diff --git a/alerter/src/main/resources/alerter_zh_CN.properties b/alerter/src/main/resources/alerter_zh_CN.properties index b1f11e284de..33f4e88d525 100644 --- a/alerter/src/main/resources/alerter_zh_CN.properties +++ b/alerter/src/main/resources/alerter_zh_CN.properties @@ -14,6 +14,8 @@ # limitations under the License. alerter.availability.recover = 可用性告警恢复通知, 任务状态已恢复正常 +alerter.availability.collector.recover = 采集器可用性恢复通知,采集器已上线 +alerter.availability.collector.offline = 采集器可用性告警通知,采集器已下线 alerter.alarm.recover = 告警恢复通知 alerter.notify.title = HertzBeat告警通知 alerter.notify.target = 告警目标对象 diff --git a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java index 166d812d74c..11c8efe9d3c 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java +++ b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java @@ -147,6 +147,11 @@ public interface CommonConstants { */ String AVAILABILITY = "availability"; + /** + * Collector availability + */ + String AVAILABILITY_COLLECTOR = "collectorAvailability"; + /** * Parameter Type Number */ @@ -192,6 +197,26 @@ public interface CommonConstants { */ byte AUTH_TYPE_GITEE = 5; + /** + * Inside the tag: monitorId + */ + String TAG_COLLECTOR_ID = "collectorId"; + + /** + * Inside the tag: collectorName + */ + String TAG_COLLECTOR_NAME = "collectorName"; + + /** + * Inside the tag: collectorHost + */ + String TAG_COLLECTOR_HOST = "collectorHost"; + + /** + * Inside the tag: collectorVersion + */ + String TAG_COLLECTOR_VERSION = "collectorVersion"; + /** * Inside the tag: monitorId Monitor task ID */ diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/manager/Collector.java b/common/src/main/java/org/apache/hertzbeat/common/entity/manager/Collector.java index c692bab629c..cf5b2a2013f 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/manager/Collector.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/manager/Collector.java @@ -20,14 +20,23 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.persistence.Entity; import jakarta.persistence.EntityListeners; +import jakarta.persistence.FetchType; +import jakarta.persistence.ForeignKey; import jakarta.persistence.GeneratedValue; +import jakarta.persistence.CascadeType; +import jakarta.persistence.ConstraintMode; import jakarta.persistence.GenerationType; import jakarta.persistence.Id; import jakarta.persistence.Table; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.JoinTable; import jakarta.persistence.UniqueConstraint; +import jakarta.persistence.ManyToMany; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotBlank; import java.time.LocalDateTime; +import java.util.List; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -89,4 +98,12 @@ public class Collector { @Schema(title = "Record the latest modification time (timestamp in milliseconds)") @LastModifiedDate private LocalDateTime gmtUpdate; + + @ManyToMany(targetEntity = Tag.class, cascade = {CascadeType.PERSIST, CascadeType.MERGE}, fetch = FetchType.EAGER) + @JoinTable(name = "hzb_tag_collector_bind", + foreignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT), + inverseForeignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT), + joinColumns = {@JoinColumn(name = "collector_id", referencedColumnName = "id")}, + inverseJoinColumns = {@JoinColumn(name = "tag_id", referencedColumnName = "id")}) + private List tags; } diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/manager/TagCollectorBind.java b/common/src/main/java/org/apache/hertzbeat/common/entity/manager/TagCollectorBind.java new file mode 100644 index 00000000000..d71a62da86d --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/manager/TagCollectorBind.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.common.entity.manager; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.persistence.Entity; +import jakarta.persistence.EntityListeners; +import jakarta.persistence.Index; +import jakarta.persistence.Table; +import jakarta.persistence.Id; +import jakarta.persistence.Column; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.data.annotation.CreatedDate; +import org.springframework.data.annotation.LastModifiedDate; +import org.springframework.data.jpa.domain.support.AuditingEntityListener; + +import java.time.LocalDateTime; + +import static io.swagger.v3.oas.annotations.media.Schema.AccessMode.READ_ONLY; +import static io.swagger.v3.oas.annotations.media.Schema.AccessMode.READ_WRITE; + +/** + * Tag Bind Collector + */ +@Entity +@Table(name = "hzb_tag_collector_bind", indexes = { + @Index(name = "index_tag_collector", columnList = "tag_id"), + @Index(name = "index_tag_collector", columnList = "collector_id") +}) +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Schema(description = "Tag Bind Collector") +@EntityListeners(AuditingEntityListener.class) +public class TagCollectorBind { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Schema(title = "The primary key index ID", example = "87584674384", accessMode = READ_ONLY) + private Long id; + + @Schema(title = "TAG ID", example = "87432674384", accessMode = READ_WRITE) + @Column(name = "tag_id") + private Long tagId; + + @Schema(title = "Collector Id", example = "87432674336", accessMode = READ_WRITE) + @Column(name = "collector_id") + private Long collectorId; + + @Schema(title = "Record create time", example = "1612198922000", accessMode = READ_ONLY) + @CreatedDate + private LocalDateTime gmtCreate; + + @Schema(title = "Record modify time", example = "1612198444000", accessMode = READ_ONLY) + @LastModifiedDate + private LocalDateTime gmtUpdate; +} diff --git a/common/src/main/java/org/apache/hertzbeat/common/support/event/CollectorDeletedEvent.java b/common/src/main/java/org/apache/hertzbeat/common/support/event/CollectorDeletedEvent.java new file mode 100644 index 00000000000..393923c5d26 --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/support/event/CollectorDeletedEvent.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.common.support.event; + +import org.springframework.context.ApplicationEvent; + +/** + * the event for collector delete + */ +public class CollectorDeletedEvent extends ApplicationEvent { + + private final String identity; + + public CollectorDeletedEvent(Object source, String identity) { + super(source); + this.identity = identity; + } + + public String getIdentity() { + return identity; + } +} diff --git a/common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java b/common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java index 5049a46b5de..a451aa91ba4 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java +++ b/common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java @@ -20,7 +20,7 @@ import org.springframework.context.ApplicationEvent; /** - * the event for system config change + * the event for monitor delete */ public class MonitorDeletedEvent extends ApplicationEvent { diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java b/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java index faa483e22f8..0a109d2cab9 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/component/alerter/DispatcherAlarm.java @@ -23,6 +23,7 @@ import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterWorkerPool; +import org.apache.hertzbeat.common.constants.CommonConstants; import org.apache.hertzbeat.common.entity.alerter.Alert; import org.apache.hertzbeat.common.entity.manager.NoticeReceiver; import org.apache.hertzbeat.common.entity.manager.NoticeRule; @@ -129,6 +130,9 @@ public void run() { if (alert != null) { // Determining alarm type storage alertStoreHandler.store(alert); + if (alert.getTags() != null && alert.getTags().containsKey(CommonConstants.TAG_COLLECTOR_NAME)){ + continue; + } // Notice distribution sendNotify(alert); // Execute the plugin if enable (Compatible with old version plugins, will be removed in later versions) diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java b/manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java index 4aefaa2e799..8affacd3ae8 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java @@ -17,6 +17,7 @@ package org.apache.hertzbeat.manager.dao; +import java.util.List; import java.util.Optional; import org.apache.hertzbeat.common.entity.manager.Collector; import org.springframework.data.jpa.repository.JpaRepository; @@ -34,7 +35,14 @@ public interface CollectorDao extends JpaRepository, JpaSpecifi * @return collector */ Optional findCollectorByName(String name); - + + /** + * find collectors by names + * @param names collector name list + * @return collector list + */ + List findCollectorsByNameIn(List names); + /** * delete collector by name * @param collector collector name diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/dao/TagCollectorBindDao.java b/manager/src/main/java/org/apache/hertzbeat/manager/dao/TagCollectorBindDao.java new file mode 100644 index 00000000000..a595fcc0e29 --- /dev/null +++ b/manager/src/main/java/org/apache/hertzbeat/manager/dao/TagCollectorBindDao.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.dao; + +import org.apache.hertzbeat.common.entity.manager.TagCollectorBind; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.JpaSpecificationExecutor; + +/** + * TagCollectorBindDao repository + */ +public interface TagCollectorBindDao extends JpaRepository, JpaSpecificationExecutor { + + /** + * delete tags bind by collectorId + * @param collectorId collectorId + */ + void deleteTagCollectorBindByCollectorId(Long collectorId); +} diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index a935061d0c3..c9028edfd7c 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -41,6 +42,7 @@ import org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind; import org.apache.hertzbeat.common.entity.manager.Monitor; import org.apache.hertzbeat.common.entity.manager.Param; +import org.apache.hertzbeat.common.entity.manager.Tag; import org.apache.hertzbeat.common.entity.manager.ParamDefine; import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.entity.message.CollectRep; @@ -108,6 +110,22 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { collector.setMode(collectorInfo.getMode()); collector.setVersion(collectorInfo.getVersion()); } + List tags = collector.getTags(); + if (tags == null){ + tags = new ArrayList<>(); + collector.setTags(tags); + } + + boolean hasNameTag = false; + for (Tag tag : tags) { + if (CommonConstants.TAG_COLLECTOR_NAME.equals(tag.getName())){ + tag.setTagValue(collector.getName()); + hasNameTag = true; + } + } + if (!hasNameTag){ + tags.add(Tag.builder().name(CommonConstants.TAG_COLLECTOR_NAME).tagValue(collector.getName()).type((byte) 0).build()); + } } else { if (collectorInfo == null) { log.error("collectorInfo can not null when collector not existed"); @@ -120,6 +138,9 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { .version(collectorInfo.getVersion()) .status(CommonConstants.COLLECTOR_STATUS_ONLINE) .build(); + List tags = new LinkedList<>(); + collector.setTags(tags); + tags.add(Tag.builder().name(CommonConstants.TAG_COLLECTOR_NAME).tagValue(collector.getName()).type((byte) 0).build()); } collectorDao.save(collector); ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(), diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java index afafa416906..6a5aae24980 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.alert.calculate.CollectorAlarmHandler; import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.support.CommonThreadPool; import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler; @@ -55,6 +56,8 @@ public class ManageServer implements CommandLineRunner { private final CollectorJobScheduler collectorJobScheduler; + private final CollectorAlarmHandler collectorAlarmHandler; + private ScheduledExecutorService channelSchedule; private RemotingServer remotingServer; @@ -63,9 +66,11 @@ public class ManageServer implements CommandLineRunner { public ManageServer(final SchedulerProperties schedulerProperties, final CollectorJobScheduler collectorJobScheduler, - final CommonThreadPool threadPool) { + final CommonThreadPool threadPool, + final CollectorAlarmHandler collectorAlarmHandler) { this.collectorJobScheduler = collectorJobScheduler; this.collectorJobScheduler.setManageServer(this); + this.collectorAlarmHandler = collectorAlarmHandler; this.init(schedulerProperties, threadPool); } @@ -96,6 +101,7 @@ public void start() { channel.closeFuture(); this.clientChannelTable.remove(collector); this.collectorJobScheduler.collectorGoOffline(collector); + this.collectorAlarmHandler.offline(collector); } }); } catch (Exception e) { @@ -129,6 +135,7 @@ public void addChannel(final String identity, Channel channel) { preChannel.close(); } this.clientChannelTable.put(identity, channel); + this.collectorAlarmHandler.online(identity); } public void closeChannel(final String identity) { diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/TagService.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/TagService.java index 4e9f399cbad..64fa84e12c8 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/TagService.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/TagService.java @@ -20,6 +20,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; + +import org.apache.hertzbeat.common.entity.manager.Collector; import org.apache.hertzbeat.common.entity.manager.Monitor; import org.apache.hertzbeat.common.entity.manager.Tag; import org.springframework.data.domain.Page; @@ -69,4 +71,10 @@ public interface TagService { * @param monitor monitor */ void deleteMonitorSystemTags(Monitor monitor); + + /** + * remove collector system tags + * @param collector collector + */ + void deleteCollectorSystemTags(Collector collector); } diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java index b87965b41ae..9603010fa39 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java @@ -27,15 +27,19 @@ import org.apache.hertzbeat.common.entity.dto.CollectorSummary; import org.apache.hertzbeat.common.entity.manager.Collector; import org.apache.hertzbeat.common.entity.manager.CollectorMonitorBind; +import org.apache.hertzbeat.common.support.event.CollectorDeletedEvent; import org.apache.hertzbeat.common.support.exception.CommonException; import org.apache.hertzbeat.common.util.IpDomainUtil; import org.apache.hertzbeat.manager.dao.CollectorDao; import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; +import org.apache.hertzbeat.manager.dao.TagCollectorBindDao; import org.apache.hertzbeat.manager.scheduler.AssignJobs; import org.apache.hertzbeat.manager.scheduler.ConsistentHash; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.manager.service.CollectorService; +import org.apache.hertzbeat.manager.service.TagService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; @@ -61,7 +65,16 @@ public class CollectorServiceImpl implements CollectorService { @Autowired(required = false) private ManageServer manageServer; - + + @Autowired(required = false) + private ApplicationContext applicationContext; + + @Autowired + private TagService tagService; + + @Autowired + private TagCollectorBindDao tagCollectorBindDao; + @Override @Transactional(readOnly = true) public Page getCollectors(String name, int pageIndex, Integer pageSize) { @@ -105,9 +118,13 @@ public void deleteRegisteredCollector(List collectors) { throw new CommonException("The collector " + collector + " has pinned tasks that cannot be deleted."); } }); - collectors.forEach(collector -> { - this.manageServer.closeChannel(collector); - this.collectorDao.deleteCollectorByName(collector); + List collectorList = this.collectorDao.findCollectorsByNameIn(collectors); + collectorList.forEach(collector -> { + this.manageServer.closeChannel(collector.getName()); + this.collectorDao.deleteCollectorByName(collector.getName()); + this.tagService.deleteCollectorSystemTags(collector); + this.tagCollectorBindDao.deleteTagCollectorBindByCollectorId(collector.getId()); + this.applicationContext.publishEvent(new CollectorDeletedEvent(this, collector.getName())); }); } diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/TagServiceImpl.java b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/TagServiceImpl.java index 4a457e63512..b931845ea55 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/TagServiceImpl.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/service/impl/TagServiceImpl.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hertzbeat.common.entity.manager.Collector; import org.apache.hertzbeat.common.entity.manager.Monitor; import org.apache.hertzbeat.common.entity.manager.Tag; import org.apache.hertzbeat.common.support.exception.CommonException; @@ -124,7 +125,7 @@ public Page getTags(String search, Byte type, int pageIndex, int pageSize) @Override public void deleteTags(HashSet ids) { - if (CollectionUtils.isEmpty(ids)){ + if (CollectionUtils.isEmpty(ids)) { return; } if (tagMonitorBindDao.countByTagIdIn(ids) != 0) { @@ -141,9 +142,16 @@ public List listTag(Set ids) { @Override public void deleteMonitorSystemTags(Monitor monitor) { if (CollectionUtils.isNotEmpty(monitor.getTags())) { - List tags = monitor.getTags().stream().filter(tag -> Objects.nonNull(tag.getType()) && tag.getType() == (byte) 0).collect(Collectors.toList()); + List tags = monitor.getTags().stream().filter(tag -> Objects.nonNull(tag.getType()) && tag.getType() == (byte) 0).collect(Collectors.toList()); tagDao.deleteAll(tags); } } + @Override + public void deleteCollectorSystemTags(Collector collector) { + if (CollectionUtils.isNotEmpty(collector.getTags())) { + List tags = collector.getTags().stream().filter(tag -> Objects.nonNull(tag.getType()) && tag.getType() == (byte) 0).toList(); + tagDao.deleteAll(tags); + } + } } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java index fab01262500..997ebeaa565 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; import javax.annotation.Resource; import org.apache.hertzbeat.common.entity.manager.Collector; import org.apache.hertzbeat.manager.AbstractSpringIntegrationTest; @@ -27,6 +28,8 @@ import org.junit.jupiter.api.Test; import org.springframework.transaction.annotation.Transactional; +import java.util.List; + /** * Test case for {@link CollectorDao} */ @@ -66,5 +69,8 @@ public void findCollectorByName() { assertTrue(collectorDao.findCollectorByName("test").isPresent()); } - + @Test + public void findCollectorsByNameIn(){ + assertFalse(collectorDao.findCollectorsByNameIn(List.of("test")).isEmpty()); + } } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/dao/TagDaoTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/dao/TagDaoTest.java index 8aa6de244a5..ecc69f6620b 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/dao/TagDaoTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/dao/TagDaoTest.java @@ -24,8 +24,11 @@ import jakarta.annotation.Resource; import java.time.LocalDateTime; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; + +import org.apache.hertzbeat.common.constants.CommonConstants; import org.apache.hertzbeat.common.entity.manager.Tag; import org.apache.hertzbeat.manager.AbstractSpringIntegrationTest; import org.junit.jupiter.api.AfterEach; @@ -71,11 +74,15 @@ void deleteTagsByIdIn() { assertNotNull(tagList); assertFalse(tagList.isEmpty()); - Set ids = tagList.stream().map(Tag::getId).collect(Collectors.toSet()); + Set ids = tagList.stream() + .filter(tag -> !Objects.equals(tag.getName(), CommonConstants.TAG_COLLECTOR_NAME)) + .map(Tag::getId).collect(Collectors.toSet()); assertDoesNotThrow(() -> tagDao.deleteTagsByIdIn(ids)); + int count = tagList.size() - ids.size(); + tagList = tagDao.findAll(); assertNotNull(tagList); - assertTrue(tagList.isEmpty()); + assertTrue(tagList.size() == count); } } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java index c31ec4c8b6c..f5dc2095b76 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java @@ -41,6 +41,7 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.context.ApplicationContext; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.jpa.domain.Specification; @@ -67,6 +68,9 @@ public class CollectorServiceTest { @Mock private ManageServer manageServer; + @Mock + private ApplicationContext applicationContext; + @Test public void getCollectors() { diff --git a/web-app/src/app/routes/alert/alert-center/alert-center.component.html b/web-app/src/app/routes/alert/alert-center/alert-center.component.html index 16b4381e6ea..b35ecd1be3b 100644 --- a/web-app/src/app/routes/alert/alert-center/alert-center.component.html +++ b/web-app/src/app/routes/alert/alert-center/alert-center.component.html @@ -138,7 +138,13 @@ {{ sliceTagName(tag) }} - + + + + {{ sliceTagName(tag) }} + + + {{ sliceTagName(tag) }} diff --git a/web-app/src/app/routes/alert/alert-center/alert-center.component.ts b/web-app/src/app/routes/alert/alert-center/alert-center.component.ts index 713dc9d8841..e4de5242d5e 100644 --- a/web-app/src/app/routes/alert/alert-center/alert-center.component.ts +++ b/web-app/src/app/routes/alert/alert-center/alert-center.component.ts @@ -116,6 +116,9 @@ export class AlertCenterComponent implements OnInit { if (target === 'availability') { return this.i18nSvc.fanyi('monitor.availability'); } + if (target == 'collectorAvailability') { + return this.i18nSvc.fanyi('monitor.collector.availability'); + } return target; } diff --git a/web-app/src/assets/i18n/en-US.json b/web-app/src/assets/i18n/en-US.json index d5c4d448ed9..a8d20b789d4 100644 --- a/web-app/src/assets/i18n/en-US.json +++ b/web-app/src/assets/i18n/en-US.json @@ -75,6 +75,7 @@ "collector": "Collector", "collector.tip": "Choose which collector to dispatch this monitoring", "collector.system.default": "Default System Dispatch", + "collector.availability": "Collector Availability", "collector.status.online": "Online", "collector.status.offline": "Offline", "category": { diff --git a/web-app/src/assets/i18n/zh-CN.json b/web-app/src/assets/i18n/zh-CN.json index 99482d15ebe..5acb5332408 100644 --- a/web-app/src/assets/i18n/zh-CN.json +++ b/web-app/src/assets/i18n/zh-CN.json @@ -75,6 +75,7 @@ "collector": "采集器", "collector.tip": "配置此监控使用哪台采集器调度采集", "collector.system.default": "默认系统调度", + "collector.availability": "采集器可用性", "collector.status.online": "在线", "collector.status.offline": "离线", "category": { diff --git a/web-app/src/assets/i18n/zh-TW.json b/web-app/src/assets/i18n/zh-TW.json index d873c2fe8bc..15b497ca1b8 100644 --- a/web-app/src/assets/i18n/zh-TW.json +++ b/web-app/src/assets/i18n/zh-TW.json @@ -89,6 +89,7 @@ "collector": "採集器", "collector.tip": "配置此監控使用哪台採集器調度採集", "collector.system.default": "默認系統調度", + "collector.availability": "採集器可用性", "collector.status.online": "在線", "collector.status.offline": "離線", "category": {