diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index ea46dc3432d..a0b6a29443b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -115,6 +115,15 @@ public interface StreamSinkService { */ PageResult listByCondition(SinkPageRequest request, String operator); + /** + * Paging query stream sink detail info based on conditions. + * + * @param request paging request + * @param operator operator + * @return sink detail page list + */ + PageResult> listDetail(SinkPageRequest request, String operator); + /** * Paging query stream sink info based on conditions. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 21804003056..eaca5ff775d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.service.sink; +import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.OperationTarget; @@ -25,21 +27,31 @@ import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.entity.InlongStreamEntity; +import org.apache.inlong.manager.dao.entity.SortConfigEntity; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; +import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo; import org.apache.inlong.manager.pojo.sink.ParseFieldRequest; import org.apache.inlong.manager.pojo.sink.SinkApproveDTO; import org.apache.inlong.manager.pojo.sink.SinkBriefInfo; @@ -93,6 +105,8 @@ import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; +import static org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION; +import static org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP; /** * Implementation of sink service interface @@ -104,7 +118,10 @@ public class StreamSinkServiceImpl implements StreamSinkService { private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]"); private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3; private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2; - + @Autowired + private SortConfigEntityMapper sortConfigEntityMapper; + @Autowired + private InlongClusterEntityMapper clusterEntityMapper; @Autowired private SinkOperatorFactory operatorFactory; @Autowired @@ -121,7 +138,6 @@ public class StreamSinkServiceImpl implements StreamSinkService { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Autowired private ObjectMapper objectMapper; - // To avoid circular dependencies, you cannot use @Autowired, it will be injected by AutowireCapableBeanFactory private InlongStreamProcessService streamProcessOperation; @@ -297,6 +313,89 @@ public PageResult listByCondition(SinkPageRequest request, return pageResult; } + @Override + public PageResult> listDetail(SinkPageRequest request, String operator) { + PageHelper.startPage(request.getPageNum(), request.getPageSize()); + OrderFieldEnum.checkOrderField(request); + OrderTypeEnum.checkOrderType(request); + Page entityPage = (Page) sinkMapper.selectByCondition(request); + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId()); + InlongGroupInfo groupInfo = null; + switch (groupEntity.getMqType()) { + case MQType.PULSAR: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongPulsarInfo::new, true); + break; + case MQType.TUBEMQ: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongTubeMQInfo::new, true); + break; + case MQType.KAFKA: + groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongKafkaInfo::new, true); + default: + throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage()); + } + InlongGroupInfo finalGroupInfo = groupInfo; + List> responseList = entityPage.stream().map(sink -> { + StreamSinkOperator sinkOperator = operatorFactory.getInstance(sink.getSinkType()); + StreamSink streamSink = sinkOperator.getFromEntity(sink); + Map requestMap = JsonUtils.OBJECT_MAPPER.convertValue(streamSink, + new TypeReference>() { + }); + InlongStreamEntity streamEntity = + streamMapper.selectByIdentifier(request.getInlongGroupId(), sink.getInlongStreamId()); + String topic = ""; + String consumeGroup = ""; + switch (groupEntity.getMqType()) { + case MQType.PULSAR: + List pulsarClusters = clusterEntityMapper.selectByKey( + finalGroupInfo.getInlongClusterTag(), null, MQType.PULSAR); + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + if (CollectionUtils.isEmpty(pulsarClusters)) { + break; + } + String tenant = pulsarDTO.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + InlongClusterEntity pulsarCluster = pulsarClusters.get(0); + // Multiple adminUrls should be configured for pulsar, + // otherwise all requests will be sent to the same broker + PulsarClusterDTO pulsarClusterDTO = PulsarClusterDTO.getFromJson(pulsarCluster.getExtParams()); + tenant = pulsarClusterDTO.getPulsarTenant(); + } + String fullTopicName = + tenant + "/" + finalGroupInfo.getMqResource() + "/" + streamEntity.getMqResource(); + topic = "persistent://" + fullTopicName; + consumeGroup = String.format(PULSAR_SUBSCRIPTION, finalGroupInfo.getInlongClusterTag(), + fullTopicName, sink.getId()); + break; + case MQType.TUBEMQ: + topic = streamEntity.getMqResource(); + consumeGroup = String.format(TUBE_CONSUMER_GROUP, groupEntity.getInlongClusterTag(), topic, + sink.getId()); + break; + case MQType.KAFKA: + topic = streamEntity.getMqResource(); + if (topic.equals(streamEntity.getInlongStreamId())) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topic = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + finalGroupInfo.getMqResource(), streamEntity.getMqResource()); + } + break; + default: + throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage()); + } + requestMap.put("topic", topic); + requestMap.put("consumerGroup", consumeGroup); + SortConfigEntity sortConfigEntity = sortConfigEntityMapper.selectBySinkId(sink.getId()); + if (sortConfigEntity != null) { + requestMap.put("dataFlowInfo", sortConfigEntity.getConfigParams()); + } + return requestMap; + }).collect(Collectors.toList()); + PageResult> pageResult = new PageResult<>(responseList, entityPage.getTotal(), + entityPage.getPageNum(), entityPage.getPageSize()); + LOGGER.debug("success to list sink page, result size {}", pageResult.getList().size()); + return pageResult; + } + @Override public List listByCondition(SinkPageRequest request, UserInfo opInfo) { // check sink id diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index 4fd4eaabb60..b7b8dbd5d40 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.RestController; import java.util.List; +import java.util.Map; /** * Stream sink control layer @@ -88,6 +89,12 @@ public Response> listByCondition(@RequestBody S return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/sink/listDetail", method = RequestMethod.POST) + @ApiOperation(value = "List stream sinks detail by paginating") + public Response>> listDetail(@RequestBody SinkPageRequest request) { + return Response.success(sinkService.listDetail(request, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SINK) @ApiOperation(value = "Update stream sink")