Skip to content

Commit

Permalink
[INLONG-10842][Manager] Fix the problem of tube cluster address not o…
Browse files Browse the repository at this point in the history
…btained when obtaining consumer group information
  • Loading branch information
fuweng11 committed Aug 21, 2024
1 parent fb395fe commit 9e5e6f4
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.inlong.manager.service.consume;

import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupService;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -36,6 +40,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Objects;

/**
Expand All @@ -48,6 +53,8 @@ public class ConsumeTubeMQOperator extends AbstractConsumeOperator {

@Autowired
private InlongGroupService groupService;
@Autowired
private InlongClusterService clusterService;

@Override
public Boolean accept(String mqType) {
Expand Down Expand Up @@ -81,7 +88,12 @@ public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
ConsumeTubeMQDTO dto = ConsumeTubeMQDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(dto, consumeInfo);
}

InlongGroupInfo groupInfo = groupService.get(entity.getInlongGroupId());
List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.TUBEMQ);
Preconditions.expectNotEmpty(clusterInfos,
"tubeMQ cluster not exist for groupId=" + groupInfo.getInlongGroupId());
consumeInfo.setClusterInfos(clusterInfos);
return consumeInfo;
}

Expand Down

0 comments on commit 9e5e6f4

Please sign in to comment.