Skip to content

Commit

Permalink
[INLONG-10911][Manager] Support pagination to query sort task details…
Browse files Browse the repository at this point in the history
… information
  • Loading branch information
fuweng11 committed Aug 27, 2024
1 parent d90b249 commit 0beb7ad
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ public interface StreamSinkService {
*/
PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, String operator);

/**
* Paging query stream sink detail info based on conditions.
*
* @param request paging request
* @param operator operator
* @return sink detail page list
*/
PageResult<Map<String, Object>> listDetail(SinkPageRequest request, String operator);

/**
* Paging query stream sink info based on conditions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.service.sink;

import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationTarget;
Expand All @@ -25,21 +27,31 @@
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
Expand Down Expand Up @@ -93,6 +105,8 @@
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
import static org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
import static org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;

/**
* Implementation of sink service interface
Expand All @@ -104,7 +118,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private static final Pattern PARSE_FIELD_CSV_SPLITTER = Pattern.compile("[\t\\s,]");
private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;

@Autowired
private SortConfigEntityMapper sortConfigEntityMapper;
@Autowired
private InlongClusterEntityMapper clusterEntityMapper;
@Autowired
private SinkOperatorFactory operatorFactory;
@Autowired
Expand All @@ -121,7 +138,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Autowired
private ObjectMapper objectMapper;

// To avoid circular dependencies, you cannot use @Autowired, it will be injected by AutowireCapableBeanFactory
private InlongStreamProcessService streamProcessOperation;

Expand Down Expand Up @@ -297,6 +313,89 @@ public PageResult<? extends StreamSink> listByCondition(SinkPageRequest request,
return pageResult;
}

@Override
public PageResult<Map<String, Object>> listDetail(SinkPageRequest request, String operator) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) sinkMapper.selectByCondition(request);
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
InlongGroupInfo groupInfo = null;
switch (groupEntity.getMqType()) {
case MQType.PULSAR:
groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongPulsarInfo::new, true);
break;
case MQType.TUBEMQ:
groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongTubeMQInfo::new, true);
break;
case MQType.KAFKA:
groupInfo = CommonBeanUtils.copyProperties(groupEntity, InlongKafkaInfo::new, true);
default:
throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
}
InlongGroupInfo finalGroupInfo = groupInfo;
List<Map<String, Object>> responseList = entityPage.stream().map(sink -> {
StreamSinkOperator sinkOperator = operatorFactory.getInstance(sink.getSinkType());
StreamSink streamSink = sinkOperator.getFromEntity(sink);
Map<String, Object> requestMap = JsonUtils.OBJECT_MAPPER.convertValue(streamSink,
new TypeReference<Map<String, Object>>() {
});
InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(request.getInlongGroupId(), sink.getInlongStreamId());
String topic = "";
String consumeGroup = "";
switch (groupEntity.getMqType()) {
case MQType.PULSAR:
List<InlongClusterEntity> pulsarClusters = clusterEntityMapper.selectByKey(
finalGroupInfo.getInlongClusterTag(), null, MQType.PULSAR);
InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
if (CollectionUtils.isEmpty(pulsarClusters)) {
break;
}
String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
InlongClusterEntity pulsarCluster = pulsarClusters.get(0);
// Multiple adminUrls should be configured for pulsar,
// otherwise all requests will be sent to the same broker
PulsarClusterDTO pulsarClusterDTO = PulsarClusterDTO.getFromJson(pulsarCluster.getExtParams());
tenant = pulsarClusterDTO.getPulsarTenant();
}
String fullTopicName =
tenant + "/" + finalGroupInfo.getMqResource() + "/" + streamEntity.getMqResource();
topic = "persistent://" + fullTopicName;
consumeGroup = String.format(PULSAR_SUBSCRIPTION, finalGroupInfo.getInlongClusterTag(),
fullTopicName, sink.getId());
break;
case MQType.TUBEMQ:
topic = streamEntity.getMqResource();
consumeGroup = String.format(TUBE_CONSUMER_GROUP, groupEntity.getInlongClusterTag(), topic,
sink.getId());
break;
case MQType.KAFKA:
topic = streamEntity.getMqResource();
if (topic.equals(streamEntity.getInlongStreamId())) {
// the default mq resource (stream id) is not sufficient to discriminate different kafka topics
topic = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
finalGroupInfo.getMqResource(), streamEntity.getMqResource());
}
break;
default:
throw new BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
}
requestMap.put("topic", topic);
requestMap.put("consumerGroup", consumeGroup);
SortConfigEntity sortConfigEntity = sortConfigEntityMapper.selectBySinkId(sink.getId());
if (sortConfigEntity != null) {
requestMap.put("dataFlowInfo", sortConfigEntity.getConfigParams());
}
return requestMap;
}).collect(Collectors.toList());
PageResult<Map<String, Object>> pageResult = new PageResult<>(responseList, entityPage.getTotal(),
entityPage.getPageNum(), entityPage.getPageSize());
LOGGER.debug("success to list sink page, result size {}", pageResult.getList().size());
return pageResult;
}

@Override
public List<? extends StreamSink> listByCondition(SinkPageRequest request, UserInfo opInfo) {
// check sink id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Map;

/**
* Stream sink control layer
Expand Down Expand Up @@ -88,6 +89,12 @@ public Response<PageResult<? extends StreamSink>> listByCondition(@RequestBody S
return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/sink/listDetail", method = RequestMethod.POST)
@ApiOperation(value = "List stream sinks detail by paginating")
public Response<PageResult<Map<String, Object>>> listDetail(@RequestBody SinkPageRequest request) {
return Response.success(sinkService.listDetail(request, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SINK)
@ApiOperation(value = "Update stream sink")
Expand Down

0 comments on commit 0beb7ad

Please sign in to comment.