From 6074c08e1a125580e90816b303769bf121b49189 Mon Sep 17 00:00:00 2001 From: liutianyou Date: Thu, 29 Aug 2024 11:05:54 +0800 Subject: [PATCH] [feat] support monitor MQTT connections (#2618) Co-authored-by: shown Co-authored-by: Logic --- collector/pom.xml | 6 + .../collect/mqtt/MqttCollectImpl.java | 231 ++++++++++++++++++ .../collector/dispatch/DispatchConstants.java | 5 + ...ertzbeat.collector.collect.AbstractCollect | 1 + .../hertzbeat/common/entity/job/Metrics.java | 5 + .../entity/job/protocol/MqttProtocol.java | 95 +++++++ .../src/main/resources/define/app-mqtt.yml | 170 +++++++++++++ material/licenses/collector/LICENSE | 7 +- 8 files changed, 517 insertions(+), 3 deletions(-) create mode 100644 collector/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java create mode 100644 common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java create mode 100644 manager/src/main/resources/define/app-mqtt.yml diff --git a/collector/pom.xml b/collector/pom.xml index 2d93db59718..fea17e3844d 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -206,6 +206,12 @@ + + + com.hivemq + hivemq-mqtt-client + 1.3.3 + diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java b/collector/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java new file mode 100644 index 00000000000..14f8e18145a --- /dev/null +++ b/collector/src/main/java/org/apache/hertzbeat/collector/collect/mqtt/MqttCollectImpl.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.mqtt; + +import com.hivemq.client.mqtt.MqttVersion; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.hertzbeat.collector.collect.AbstractCollect; +import org.apache.hertzbeat.collector.constants.CollectorConstants; +import org.apache.hertzbeat.collector.dispatch.DispatchConstants; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.hertzbeat.common.entity.message.CollectRep.MetricsData.Builder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; +import org.springframework.util.StopWatch; + +/** + * collect mqtt metrics + */ +public class MqttCollectImpl extends AbstractCollect { + + public MqttCollectImpl() { + } + + private static final Logger logger = LoggerFactory.getLogger(MqttCollectImpl.class); + + @Override + public void preCheck(Metrics metrics) throws IllegalArgumentException { + MqttProtocol mqttProtocol = metrics.getMqtt(); + Assert.hasText(mqttProtocol.getHost(), "MQTT protocol host is required"); + Assert.hasText(mqttProtocol.getPort(), "MQTT protocol port is required"); + Assert.hasText(mqttProtocol.getProtocolVersion(), "MQTT protocol version is required"); + } + + @Override + public void collect(Builder builder, long monitorId, String app, Metrics metrics) { + MqttProtocol mqtt = metrics.getMqtt(); + String protocolVersion = mqtt.getProtocolVersion(); + MqttVersion mqttVersion = MqttVersion.valueOf(protocolVersion); + if (mqttVersion == MqttVersion.MQTT_3_1_1) { + collectWithVersion3(metrics, builder); + } else if (mqttVersion == MqttVersion.MQTT_5_0) { + collectWithVersion5(metrics, builder); + } + } + + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_MQTT; + } + + /** + * collecting data of MQTT 5 + */ + private void collectWithVersion5(Metrics metrics, Builder builder) { + MqttProtocol mqttProtocol = metrics.getMqtt(); + Map data = new HashMap<>(); + Mqtt5AsyncClient client = buildMqtt5Client(mqttProtocol); + long responseTime = connectClient(client, mqtt5AsyncClient -> { + CompletableFuture connectFuture = mqtt5AsyncClient.connect(); + try { + connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(getErrorMessage(e.getMessage())); + } + }); + testDescribeAndPublish5(client, mqttProtocol, data); + convertToMetricsData(builder, metrics, responseTime, data); + client.disconnect(); + } + + /** + * collecting data of MQTT 3.1.1 + */ + private void collectWithVersion3(Metrics metrics, Builder builder) { + MqttProtocol mqttProtocol = metrics.getMqtt(); + Map data = new HashMap<>(); + Mqtt3AsyncClient client = buildMqtt3Client(mqttProtocol); + long responseTime = connectClient(client, mqtt3AsyncClient -> { + CompletableFuture connectFuture = mqtt3AsyncClient.connect(); + try { + connectFuture.get(Long.parseLong(mqttProtocol.getTimeout()), TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + builder.setCode(CollectRep.Code.FAIL); + builder.setMsg(getErrorMessage(e.getMessage())); + } + }); + testDescribeAndPublish3(client, mqttProtocol, data); + convertToMetricsData(builder, metrics, responseTime, data); + client.disconnect(); + } + + private void testDescribeAndPublish3(Mqtt3AsyncClient client, MqttProtocol mqttProtocol, Map data) { + data.put("canDescribe", test(() -> { + client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send(); + client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send(); + }, "subscribe").toString()); + + data.put("canPublish", !mqttProtocol.testPublish() ? Boolean.FALSE.toString() : test(() -> { + client.publishWith().topic(mqttProtocol.getTopic()) + .payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE).send(); + data.put("canPublish", Boolean.TRUE.toString()); + }, "publish").toString()); + } + + private void testDescribeAndPublish5(Mqtt5AsyncClient client, MqttProtocol mqttProtocol, Map data) { + data.put("canDescribe", test(() -> { + client.subscribeWith().topicFilter(mqttProtocol.getTopic()).qos(MqttQos.AT_LEAST_ONCE).send(); + client.unsubscribeWith().topicFilter(mqttProtocol.getTopic()).send(); + }, "subscribe").toString()); + + data.put("canPublish", !mqttProtocol.testPublish() ? Boolean.FALSE.toString() : test(() -> { + client.publishWith().topic(mqttProtocol.getTopic()) + .payload(mqttProtocol.getTestMessage().getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE).send(); + data.put("canPublish", Boolean.TRUE.toString()); + }, "publish").toString()); + } + + private Mqtt5AsyncClient buildMqtt5Client(MqttProtocol mqttProtocol) { + Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder() + .serverHost(mqttProtocol.getHost()) + .identifier(mqttProtocol.getClientId()) + .serverPort(Integer.parseInt(mqttProtocol.getPort())); + + if (mqttProtocol.hasAuth()) { + mqtt5ClientBuilder.simpleAuth().username(mqttProtocol.getUsername()) + .password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth(); + } + return mqtt5ClientBuilder.buildAsync(); + } + + private Mqtt3AsyncClient buildMqtt3Client(MqttProtocol mqttProtocol) { + + Mqtt3ClientBuilder mqtt3ClientBuilder = Mqtt3Client.builder() + .serverHost(mqttProtocol.getHost()) + .identifier(mqttProtocol.getClientId()) + .serverPort(Integer.parseInt(mqttProtocol.getPort())); + + if (mqttProtocol.hasAuth()) { + mqtt3ClientBuilder.simpleAuth().username(mqttProtocol.getUsername()) + .password(mqttProtocol.getPassword().getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth(); + } + return mqtt3ClientBuilder.buildAsync(); + } + + public long connectClient(T client, Consumer connect) { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + connect.accept(client); + stopWatch.stop(); + return stopWatch.getTotalTimeMillis(); + } + + private void convertToMetricsData(Builder builder, Metrics metrics, long responseTime, Map data) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + for (String column : metrics.getAliasFields()) { + if (CollectorConstants.RESPONSE_TIME.equals(column)) { + valueRowBuilder.addColumns(String.valueOf(responseTime)); + } else { + String value = data.get(column); + value = value == null ? CommonConstants.NULL_VALUE : value; + valueRowBuilder.addColumns(value); + } + } + builder.addValues(valueRowBuilder.build()); + } + + private Boolean test(Runnable runnable, String operationName) { + try { + runnable.run(); + return true; + } catch (Exception e) { + logger.error("{} fail", operationName, e); + } + return false; + } + + private String getErrorMessage(String errorMessage) { + if (StringUtils.isBlank(errorMessage)) { + return "connect failed"; + } + String[] split = errorMessage.split(":"); + if (split.length > 1) { + return Arrays.stream(split).skip(1).collect(Collectors.joining(":")); + } + return errorMessage; + } + +} diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java index 0dac9c39a14..8be5eab9bc0 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java @@ -137,6 +137,11 @@ public interface DispatchConstants { */ String PROTOCOL_SCRIPT = "script"; + /** + * protocol mqtt + */ + String PROTOCOL_MQTT = "mqtt"; + // Protocol type related - end // http protocol related - start should reuse HttpHeaders as much as possible diff --git a/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect b/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect index ddf77d2f783..91ee9e1b513 100644 --- a/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect +++ b/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect @@ -25,3 +25,4 @@ org.apache.hertzbeat.collector.collect.redfish.RedfishCollectImpl org.apache.hertzbeat.collector.collect.nebulagraph.NgqlCollectImpl org.apache.hertzbeat.collector.collect.imap.ImapCollectImpl org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl +org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java index 99d25fa6ac8..37b96db3307 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java @@ -38,6 +38,7 @@ import org.apache.hertzbeat.common.entity.job.protocol.JmxProtocol; import org.apache.hertzbeat.common.entity.job.protocol.MemcachedProtocol; import org.apache.hertzbeat.common.entity.job.protocol.MongodbProtocol; +import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; import org.apache.hertzbeat.common.entity.job.protocol.NebulaGraphProtocol; import org.apache.hertzbeat.common.entity.job.protocol.NginxProtocol; import org.apache.hertzbeat.common.entity.job.protocol.NgqlProtocol; @@ -225,6 +226,10 @@ public class Metrics { * Monitoring configuration information using the public script protocol */ private ScriptProtocol script; + /** + * Monitoring configuration information using the public mqtt protocol + */ + private MqttProtocol mqtt; /** * collector use - Temporarily store subTask metrics response data diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java new file mode 100644 index 00000000000..c14c162211d --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/MqttProtocol.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.common.entity.job.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +/** + * mqtt protocol + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class MqttProtocol { + + /** + * ip address or domain name of the peer host + */ + private String host; + + /** + * peer host port + */ + private String port; + + /** + * username + */ + private String username; + + /** + * password + */ + private String password; + + /** + * time out period + */ + private String timeout; + + /** + * client id + */ + private String clientId; + + /** + * message used to test whether the mqtt connection can be pushed normally + */ + private String testMessage; + + /** + * protocol version of mqtt + */ + private String protocolVersion; + + /** + * monitor topic + */ + private String topic; + + /** + * Determine whether authentication is required + * @return true if it has auth info + */ + public boolean hasAuth() { + return StringUtils.isNotBlank(this.username) && StringUtils.isNotBlank(this.password); + } + + /** + * Determine whether you need to test whether messages can be pushed normally + * @return turn if it has test message + */ + public boolean testPublish(){ + return StringUtils.isNotBlank(this.testMessage); + } +} diff --git a/manager/src/main/resources/define/app-mqtt.yml b/manager/src/main/resources/define/app-mqtt.yml new file mode 100644 index 00000000000..c8f98532777 --- /dev/null +++ b/manager/src/main/resources/define/app-mqtt.yml @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The monitoring type category:service-application service monitoring db-database monitoring mid-middleware custom-custom monitoring os-operating system monitoring +category: service +# The monitoring type eg: linux windows tomcat mysql aws... +app: mqtt +# The app api i18n name +name: + zh-CN: MQTT 连接 + en-US: MQTT Connection +# The description and help of this monitoring type +help: + zh-CN: HertzBeat 对 MQTT 连接进行监测。
您可以点击 “新建 MQTT 连接” 并进行配置,或者选择“更多操作”,导入已有配置。 + en-US: HertzBeat monitors MQTT connections.
You can click "New MQTT connection" and configure it, or select "More actions" to import an existing configuration. + zh-TW: HertzBeat 對 MQTT 連接進行監測。
您可以點選 “新建 MQTT 連線” 並進行配置,或選擇“更多操作”,匯入已有配置。 +# Input params define for monitoring(render web ui by the definition) +params: + # field-param field key + - field: host + # name-param field display i18n name + name: + zh-CN: MQTT的Host + en-US: Target Host + # type-param field type(most mapping the html input type) + type: host + # required-true or false + required: true + # field-param field key + - field: port + # name-param field display i18n name + name: + zh-CN: 端口 + en-US: Port + # type-param field type(most mapping the html input type) + type: number + # when type is number, range is required + range: '[0,65535]' + # required-true or false + required: true + # default value 1883 + defaultValue: 1883 + - field: protocolVersion + name: + zh-CN: 协议版本 + en-US: Protocol version + type: radio + options: + - label: MQTT 3.1.1 + value: MQTT_3_1_1 + - label: MQTT 5.0 + value: MQTT_5_0 + required: true + defaultValue: MQTT_3_1_1 + # field-param field key + - field: timeout + # name-param field display i18n name + name: + zh-CN: 连接超时时间(ms) + en-US: Connect Timeout(ms) + # type-param field type(most mapping the html input type) + type: number + # when type is number, range is required + range: '[0,100000]' + # required-true or false + required: true + # default value 6000 + defaultValue: 6000 + # field-param field key + - field: username + name: + zh-CN: 用户名 + en-US: Username + type: text + hide: true + # required-true or false + required: false + - field: password + name: + zh-CN: 密码 + en-US: Password + type: text + hide: true + # required-true or false + required: false + - field: clientId + name: + zh-CN: 客户端ID + en-US: Client Id + type: text + defaultValue: hertzbeat-mqtt-client + # required-true or false + required: true + + - field: topic + name: + zh-CN: 主题 + en-US: Topic + type: text + required: true + - field: testMessage + name: + zh-CN: 测试消息 + en-US: Test message + type: text + required: false +# collect metrics config list +metrics: + # metrics - summary + - name: summary + i18n: + zh-CN: 概要 + en-US: Summary + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + fields: + - field: responseTime + type: 0 + unit: ms + i18n: + zh-CN: 响应时间 + en-US: Response Time + - field: canDescribe + type: 1 + i18n: + zh-CN: 正常订阅 + en-US: Normal subscription + - field: canPublish + type: 1 + i18n: + zh-CN: 正常推送 + en-US: Normal publish + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: mqtt + # Specific collection configuration when protocol is telnet protocol + mqtt: + # telnet host + host: ^_^host^_^ + # port + port: ^_^port^_^ + # timeout + timeout: ^_^timeout^_^ + # email + topic: ^_^topic^_^ + # clientId + clientId: ^_^clientId^_^ + # protocolVersion + protocolVersion: ^_^protocolVersion^_^ + # username + username: ^_^username^_^ + # password + password: ^_^password^_^ + # testMessage + testMessage: ^_^testMessage^_^ + + diff --git a/material/licenses/collector/LICENSE b/material/licenses/collector/LICENSE index 1fbaa5618eb..2620a42bd70 100644 --- a/material/licenses/collector/LICENSE +++ b/material/licenses/collector/LICENSE @@ -324,6 +324,7 @@ The text of each license is the standard Apache 2.0 license. https://mvnrepository.com/artifact/org.springframework/spring-webmvc/6.1.4 Apache-2.0 https://mvnrepository.com/artifact/org.apache.tomcat.embed/tomcat-embed-core/10.1.19 Apache-2.0 https://mvnrepository.com/artifact/com.vesoft/client/3.6.0 Apache-2.0 + https://mvnrepository.com/artifact/com.hivemq/hivemq-mqtt-client/1.3.3 Apache-2.0 ======================================================================== @@ -374,9 +375,9 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://mvnrepository.com/artifact/jakarta.persistence/jakarta.persistence-api/3.1.0 https://mvnrepository.com/artifact/jakarta.annotation/jakarta.annotation-api/2.1.1 https://mvnrepository.com/artifact/org.aspectj/aspectjweaver/1.9.21 - https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.asm/9.5.0 - https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.core/4.0.2 - https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.jpa/4.0.2 + https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.asm/9.5.0 + https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.core/4.0.2 + https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.jpa/4.0.2 https://mvnrepository.com/artifact/org.eclipse.persistence/org.eclipse.persistence.jpa.jpql/4.0.2