Skip to content

Commit

Permalink
修复CI问题
Browse files Browse the repository at this point in the history
  • Loading branch information
chengyouling committed Nov 7, 2024
1 parent 6f87b6f commit 18a1251
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class RocketMqConsumerController {
@Value("${rocketmq.topic}")
private String topic;

private final int maxReconsumeTimes = 10000;

private final int maxNums = 32;

/**
* clear cache count info
*
Expand All @@ -77,20 +81,18 @@ public Map<String, Object> getMessageCount() {
if (pullConsumer == null) {
pullConsumer = new DefaultMQPullConsumer("default");
pullConsumer.setNamesrvAddr(nameServer);
int maxReconsumeTimes = 10000;
pullConsumer.setMaxReconsumeTimes(maxReconsumeTimes);
pullConsumer.start();
}
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
if (messageQueues.isEmpty()) {
return new HashMap<>();
}
int maxNums = 32;
for (MessageQueue mq : messageQueues) {
PullResultExt pullResult = (PullResultExt) pullConsumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffSet(mq), maxNums);
putMessageQueueOffSet(mq, pullResult);
if (PullStatus.FOUND == pullResult.getPullStatus()) {
if (pullResult.getPullStatus() == PullStatus.FOUND) {
List<MessageExt> messageExts = pullResult.getMsgFoundList();
for (MessageExt messageExt: messageExts) {
RocketMqMessageUtils.convertMessageCount(messageExt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class RocketMqMessageUtils {
private RocketMqMessageUtils() {
}

/**
* convert base/gray message
*
* @param messageExt MessageExt
*/
public static void convertMessageCount(MessageExt messageExt) {
if (messageExt.getProperties() != null && GRAY.equals(messageExt.getProperties().get(
"x_lane_canary"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,18 @@ public class MqProducerController {

private DefaultMQProducer defaultMqProducer;

/**
* producer message
*
* @param message message
* @return is success
*/
@GetMapping("/producerMessage")
private final int sendMsgTimeout = 60000;

/**
* producer message
*
* @param message message
* @return is success
*/
@GetMapping("/producerMessage")
public String producerMessage(@RequestParam("message") String message) {
try {
if (defaultMqProducer == null) {
int sendMsgTimeout = 60000;
defaultMqProducer = new DefaultMQProducer("default");
defaultMqProducer.setNamesrvAddr(mqAddress);
defaultMqProducer.setSendMsgTimeout(sendMsgTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@
<module>grayscale-rocketmq-consumer-demo</module>
</modules>

</project>
</project>

0 comments on commit 18a1251

Please sign in to comment.