Skip to content

Commit

Permalink
1 功能完善
Browse files Browse the repository at this point in the history
支持rocketmq 4.7.1
broker数据统计支持addr(一台机器可以部署多个broker)。
支持broker存储数据远程迁移,校验。
客户端启动抓取配置报错日志完善
增加生产消费micrometer统计
支持broker主从同步数据监控预警
服务器链接支持链接池
消费失败的消息,邮件提醒支持一键跳过
审核列表支持分页展示
topic名等支持点击复制
bootstrap升级为3.4.1
增加纯go客户端接入wiki
增加广播模式重置偏移量wiki

2 bug修复
fix统计获取生产者组为空的bug
fix收集任务异常终止的bug。
fix广播消费者重复监控的bug
fix本地更新broker的bug
fix趋势图时间展示不全的bug
fix消费者创建失败导致数据不完全的bug
fix broker监控未按时执行的bug
fix趋势图单位显示bug
fix消息过长撑爆td的bug
  • Loading branch information
gaoyf committed Apr 8, 2021
1 parent b5e7bbb commit 32b0bfd
Show file tree
Hide file tree
Showing 184 changed files with 12,950 additions and 7,741 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ pom.xml.versionsBackup
.project
.classpath
.settings
.metadata
.cache
.metadata
.cache

!**/msg-type/*.class

# ignore idea files
.idea
*.iml
*.iml
12 changes: 11 additions & 1 deletion mq-client-common-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>4.6.5</version>
<version>4.7.1</version>
</parent>

<artifactId>mq-client-common-open</artifactId>
<packaging>jar</packaging>

<properties>
<micrometer.version>1.3.11</micrometer.version>
</properties>

<dependencies>
<dependency>
Expand All @@ -34,6 +38,12 @@
<artifactId>hystrix-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
Expand Down Expand Up @@ -126,7 +126,8 @@ protected void init() {
logger.warn("please register your {}:{} topic:{} in MQCloud first, times:{}",
role() == 1 ? "producer" : "consumer", group, topic, times++);
} else {
logger.warn("fetch cluster info err:{}, times:{}", clusterInfoDTOResult.getMessage(), times++);
logger.warn("fetch topic:{} group:{} cluster info err:{}, times:{}", getTopic(), group,
clusterInfoDTOResult.getMessage(), times++);
}
try {
Thread.sleep(1000);
Expand Down Expand Up @@ -235,7 +236,7 @@ protected void initTrace() {
traceRocketMQProducer.getProducer().setSendMsgTimeout(5000);
traceRocketMQProducer.getProducer().setMaxMessageSize(traceDispatcher.getMaxMsgSize() - 10 * 1000);
traceRocketMQProducer.setMqCloudDomain(mqCloudDomain);
traceRocketMQProducer.setInstanceName(getGroup());
traceRocketMQProducer.setInstanceName(getGroup());
// 启动trace producer
traceRocketMQProducer.start();
// 赋给TraceDispatcher
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.sohu.tv.mq.common;

/**
* 消费异常
*
* @author yongfeigao
* @date 2020年12月21日
*/
public class ConsumeException extends Exception {
private static final long serialVersionUID = 3662764764579878687L;

public ConsumeException() {
super();
}

public ConsumeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

public ConsumeException(String message, Throwable cause) {
super(message, cause);
}

public ConsumeException(String message) {
super(message);
}

public ConsumeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
*/
public class SohuSendMessageHook implements SendMessageHook {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

// 统计助手
private StatsHelper statsHelper;

public SohuSendMessageHook(DefaultMQProducer producer) {
statsHelper = new StatsHelper();
// 获取生产者group
statsHelper.setProducer(producer.getProducerGroup());
// 最大耗时,延后500毫秒
statsHelper.init(producer.getSendMsgTimeout() + 500);
// 客户端id
statsHelper.setClientId(buildMQClientId(producer));
// 获取生产者group
statsHelper.setProducer(producer.getProducerGroup());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.sohu.tv.mq.metric;

import java.io.Serializable;
/**
* mq指标
*
* @author yongfeigao
* @date 2020年12月22日
*/
public class MQMetrics implements Serializable {
private static final long serialVersionUID = 738296259571004441L;
// 最大耗时
private int maxTime;
// 总耗时
private long totalTime;
// 调用次数
private int totalCount;
// 异常次数
private int exceptionCount;
// 生产或消费组
private String group;

public int getMaxTime() {
return maxTime;
}

public MQMetrics setMaxTime(int maxTime) {
if (maxTime > this.maxTime) {
this.maxTime = maxTime;
}
return this;
}

public long getTotalTime() {
return totalTime;
}

public void setTotalTime(long totalTime) {
this.totalTime = totalTime;
}

public MQMetrics addTotalTime(long totalTime) {
this.totalTime += totalTime;
return this;
}

public int getTotalCount() {
return totalCount;
}

public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}

public MQMetrics addTotalCount(int totalCount) {
this.totalCount += totalCount;
return this;
}

public int getExceptionCount() {
return exceptionCount;
}

public void setExceptionCount(int exceptionCount) {
this.exceptionCount = exceptionCount;
}

public MQMetrics addExceptionCount(int exceptionCount) {
this.exceptionCount += exceptionCount;
return this;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

@Override
public String toString() {
return "MQMetrics [maxTime=" + maxTime + ", totalTime=" + totalTime + ", totalCount=" + totalCount
+ ", exceptionCount=" + exceptionCount + ", group=" + group + "]";
}
}
Loading

0 comments on commit 32b0bfd

Please sign in to comment.