From aedb24956280bdd0c6825521df0abf31378f1dd2 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 21 Jun 2024 10:32:19 +0800 Subject: [PATCH 1/6] [INLONG-10459][Manager] Support schedule instance callback to submit Flink batch job --- .../plugin/listener/StartupSortListener.java | 38 ++----------------- .../offline/FlinkOfflineJobOperator.java | 36 ++++++++++++++++++ .../manager/plugin/util/FlinkUtils.java | 32 ++++++++++++++++ .../service/group/InlongGroupService.java | 7 ++++ .../service/group/InlongGroupServiceImpl.java | 18 +++++++++ .../listener/sort/SortConfigListener.java | 8 +--- .../service/schedule/ScheduleOperator.java | 11 ++++++ .../schedule/ScheduleOperatorImpl.java | 25 ++++++++++-- .../web/controller/InlongGroupController.java | 7 ++++ .../processor/OfflineJobOperator.java | 27 +++++++++++++ 10 files changed, 165 insertions(+), 44 deletions(-) create mode 100644 inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java create mode 100644 inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index b0624fbe662..5d1dd82578d 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -20,8 +20,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; -import org.apache.inlong.manager.plugin.util.FlinkUtils; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; @@ -32,9 +30,9 @@ import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; + +import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs; /** * Listener of startup sort. @@ -62,8 +60,7 @@ public boolean accept(WorkflowContext workflowContext) { } log.info("add startup group listener for groupId [{}]", groupId); - return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) - || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -77,36 +74,9 @@ public ListenerResult listen(WorkflowContext context) throws Exception { } GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm; - InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo(); - // do not build sort config if the group mode is offline - if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { - log.info("no need to launching sort job for groupId={} as the mode is offline", - groupId); - return ListenerResult.success(); - } List streamInfos = groupResourceForm.getStreamInfos(); - int sinkCount = streamInfos.stream() - .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) - .reduce(0, Integer::sum); - if (sinkCount == 0) { - log.warn("not any sink configured for group {}, skip launching sort job", groupId); - return ListenerResult.success(); - } - List listenerResults = new ArrayList<>(); - for (InlongStreamInfo streamInfo : streamInfos) { - listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, - FlinkUtils.genFlinkJobName(processForm, streamInfo))); - } - - // only one stream in group for now - // we can return the list of ListenerResult if support multi-stream in the future - List failedStreams = listenerResults.stream() - .filter(t -> !t.isSuccess()).collect(Collectors.toList()); - if (failedStreams.isEmpty()) { - return ListenerResult.success(); - } - return ListenerResult.fail(failedStreams.get(0).getRemark()); + return submitFlinkJobs(groupId, streamInfos); } /** diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java new file mode 100644 index 00000000000..48d547a970c --- /dev/null +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.offline; + +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; + +import org.springframework.stereotype.Component; + +import java.util.List; + +import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs; + +@Component +public class FlinkOfflineJobOperator implements OfflineJobOperator { + + @Override + public void submitOfflineJob(String groupId, List streamInfoList) throws Exception { + submitFlinkJobs(groupId, streamInfoList); + } +} diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index d4c7863371d..7e04c6c41e7 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -218,6 +218,33 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception { return flinkConfig; } + public static ListenerResult submitFlinkJobs(String groupId, List streamInfoList) + throws Exception { + int sinkCount = streamInfoList.stream() + .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) + .reduce(0, Integer::sum); + if (sinkCount == 0) { + log.warn("not any sink configured for group {} and stream list {}, skip launching sort job", groupId, + streamInfoList.stream() + .map(s -> s.getInlongGroupId() + ":" + s.getName()).collect(Collectors.toList())); + return ListenerResult.success(); + } + + List listenerResults = new ArrayList<>(); + for (InlongStreamInfo streamInfo : streamInfoList) { + listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo))); + } + + // only one stream in group for now + // we can return the list of ListenerResult if support multi-stream in the future + List failedStreams = listenerResults.stream() + .filter(t -> !t.isSuccess()).collect(Collectors.toList()); + if (failedStreams.isEmpty()) { + ListenerResult.success(); + } + return ListenerResult.fail(failedStreams.get(0).getRemark()); + } + public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception { List sinkList = streamInfo.getSinkList(); List sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); @@ -295,4 +322,9 @@ public static String genFlinkJobName(ProcessForm processForm, InlongStreamInfo s return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + streamInfo.getInlongStreamId(); } + + public static String genFlinkJobName(InlongStreamInfo streamInfo) { + return String.format(Constants.SORT_JOB_NAME_TEMPLATE, streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN + + streamInfo.getInlongStreamId(); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index 4906e91aa7e..c6643c3259b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -218,4 +218,11 @@ void updateAfterApprove( */ List getGroupByBackUpClusterTag(String clusterTag); + /** + * Submitting offline job for the given group. + * @param groupId the inlong group to submit offline job + * + * */ + Boolean submitOfflineJob(String groupId); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index d42a61dbfb0..4430c9cdce3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -931,4 +931,22 @@ public List getGroupByBackUpClusterTag(String clusterTag) { return groupInfoList; } + @Override + public Boolean submitOfflineJob(String groupId) { + // 1. get stream info list + InlongGroupInfo groupInfo = get(groupId); + if (groupInfo == null) { + String msg = String.format("InLong group not found for groupId=%s", groupId); + LOGGER.error(msg); + throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); + } + + List streamInfoList = streamService.list(groupId); + if (CollectionUtils.isEmpty(streamInfoList)) { + LOGGER.warn("No stream info found for group {}, skip submit offline job", groupId); + return false; + } + return scheduleOperator.submitOfflineJob(groupId, streamInfoList); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index 067a320a495..5fadf2d82c8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -125,12 +125,6 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc try { for (InlongStreamInfo streamInfo : streamInfos) { - // do not build sort config if the group mode is offline - if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { - LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline", - groupId, streamInfo.getInlongStreamId()); - continue; - } List sinkList = streamInfo.getSinkList(); if (CollectionUtils.isEmpty(sinkList)) { continue; @@ -144,7 +138,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc } } catch (Exception e) { String msg = String.format("failed to build sort config for groupId=%s, ", groupId); - LOGGER.error(msg + "streamInfos=" + streamInfos, e); + LOGGER.error("{} streamInfos={}", msg, streamInfos, e); throw new WorkflowListenerException(msg + e.getMessage()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java index 6bf9c014326..653523294e1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java @@ -19,6 +19,9 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import java.util.List; /** * Operator for schedule. Including: @@ -92,4 +95,12 @@ public interface ScheduleOperator { * @Return whether succeed * */ Boolean handleGroupApprove(String groupId); + + /** + * Start offline sync job when the schedule instance callback. + * @param groupId groupId to start offline job + * @param streamInfoList stream list to start offline job + * @Return whether succeed + * */ + Boolean submitOfflineJob(String groupId, List streamInfoList); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index 411a80766a8..8fbaf69ab8b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -18,14 +18,16 @@ package org.apache.inlong.manager.service.schedule; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; -import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.schedule.ScheduleClientFactory; import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,9 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED; import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED; import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED; @@ -49,10 +54,10 @@ public class ScheduleOperatorImpl implements ScheduleOperator { private InlongGroupExtEntityMapper groupExtMapper; @Autowired - private ScheduleEntityMapper scheduleMapper; + private ScheduleClientFactory scheduleClientFactory; @Autowired - private ScheduleClientFactory scheduleClientFactory; + private OfflineJobOperator offlineJobOperator; private ScheduleEngineClient scheduleEngineClient; @@ -172,4 +177,18 @@ public Boolean handleGroupApprove(String groupId) { return registerToScheduleEngine(scheduleInfo, null, false); } + @Override + public Boolean submitOfflineJob(String groupId, List streamInfoList) { + try { + offlineJobOperator.submitOfflineJob(groupId, streamInfoList); + LOGGER.info("Submit offline job for group {} and stream list {} success.", groupId, + streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList())); + } catch (Exception e) { + String errorMsg = String.format("Submit offline job failed for groupId=%s", groupId); + LOGGER.error(errorMsg, e); + throw new BusinessException(errorMsg); + } + return true; + } + } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 04d89d332c7..90fdaa3f981 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -252,4 +252,11 @@ public Response finishTagSwitch(@PathVariable String groupId) { return Response.success(groupService.finishTagSwitch(groupId)); } + @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = RequestMethod.POST) + @ApiOperation(value = "Submitting inlong offline job process") + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) + public Response submitOfflineJob(@PathVariable String groupId) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupProcessOperation.restartProcess(groupId, operator)); + } } \ No newline at end of file diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java new file mode 100644 index 00000000000..f6d395dc7e1 --- /dev/null +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.workflow.processor; + +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import java.util.List; + +public interface OfflineJobOperator { + + void submitOfflineJob(String groupId, List streamInfoList) throws Exception; +} From 211fd8108a65acf06843fe2b69df794e09243ad2 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 25 Jun 2024 19:29:47 +0800 Subject: [PATCH 2/6] improve equals method for schedule info --- .../manager/pojo/schedule/ScheduleInfo.java | 16 ++++++++-------- .../pojo/schedule/ScheduleInfoRequest.java | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java index 24f2e6196ef..0bc8d2e6dbb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -89,16 +89,16 @@ public boolean equals(Object o) { return false; } ScheduleInfo that = (ScheduleInfo) o; - return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) - && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, - that.scheduleUnit) + return Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) - && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) - && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, - that.selfDepend) + && Objects.equals(startTime, that.startTime) + && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) + && Objects.equals(selfDepend, that.selfDepend) && Objects.equals(taskParallelism, that.taskParallelism) - && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, - that.version); + && Objects.equals(crontabExpression, that.crontabExpression); } @Override diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java index a324cee7f57..7e81f7533fa 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -83,16 +83,16 @@ public boolean equals(Object o) { return false; } ScheduleInfoRequest that = (ScheduleInfoRequest) o; - return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) - && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, - that.scheduleUnit) + return Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) - && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) - && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, - that.selfDepend) + && Objects.equals(startTime, that.startTime) + && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) + && Objects.equals(selfDepend, that.selfDepend) && Objects.equals(taskParallelism, that.taskParallelism) - && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, - that.version); + && Objects.equals(crontabExpression, that.crontabExpression); } @Override From dc970e235b480cdd90e852ecbf547228bbabfee4 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 25 Jun 2024 19:30:39 +0800 Subject: [PATCH 3/6] improve update logic for schedule info --- .../inlong/manager/schedule/NoopScheduleClient.java | 2 +- .../manager/service/schedule/ScheduleOperatorImpl.java | 8 +++++--- .../manager-web/src/main/resources/application.properties | 6 +----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java index a122235de39..7b7ddf19b0b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java @@ -26,7 +26,7 @@ public class NoopScheduleClient implements ScheduleEngineClient { @Override public boolean accept(String engineType) { - return ScheduleEngineType.NONE.getType().equals(engineType); + return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index 8fbaf69ab8b..e07dfed6b88 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -80,7 +80,7 @@ private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, Str String groupId = scheduleInfo.getInlongGroupId(); InlongGroupExtEntity scheduleStatusExt = groupExtMapper.selectByUniqueKey(groupId, InlongConstants.REGISTER_SCHEDULE_STATUS); - if (InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) { + if (scheduleStatusExt != null && InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) { // change schedule state to approved scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), APPROVED, operator); registerToScheduleEngine(scheduleInfo, operator, false); @@ -129,8 +129,10 @@ public Boolean updateOpt(ScheduleInfoRequest request, String operator) { @Override @Transactional(rollbackFor = Throwable.class) public Boolean updateAndRegister(ScheduleInfoRequest request, String operator) { - updateOpt(request, operator); - return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true); + if (updateOpt(request, operator)) { + return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true); + } + return false; } /** diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties index a6eec820add..a498ac4d15c 100644 --- a/inlong-manager/manager-web/src/main/resources/application.properties +++ b/inlong-manager/manager-web/src/main/resources/application.properties @@ -65,8 +65,4 @@ audit.admin.ids=3,4,5,6 audit.user.ids=3,4,5,6 # tencent cloud log service endpoint, The Operator cls resource by it -cls.manager.endpoint=127.0.0.1 - -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +cls.manager.endpoint=127.0.0.1 \ No newline at end of file From 2687ab3a8daf4fc2d413d95e4106b1833a043238 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 25 Jun 2024 21:05:33 +0800 Subject: [PATCH 4/6] modify the way to initialize offlineJobOperator in ScheduleOperatorImpl --- .../offline/FlinkOfflineJobOperator.java | 4 +- .../manager/plugin/util/FlinkUtils.java | 2 +- .../schedule/OfflineJobOperatorFactory.java | 51 +++++++++++++++++++ .../schedule/ScheduleOperatorImpl.java | 4 +- 4 files changed, 57 insertions(+), 4 deletions(-) create mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java index 48d547a970c..09740c53f34 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java @@ -20,13 +20,13 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; -import org.springframework.stereotype.Component; +import lombok.NoArgsConstructor; import java.util.List; import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs; -@Component +@NoArgsConstructor public class FlinkOfflineJobOperator implements OfflineJobOperator { @Override diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index 7e04c6c41e7..92285ab679e 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -240,7 +240,7 @@ public static ListenerResult submitFlinkJobs(String groupId, List failedStreams = listenerResults.stream() .filter(t -> !t.isSuccess()).collect(Collectors.toList()); if (failedStreams.isEmpty()) { - ListenerResult.success(); + return ListenerResult.success(); } return ListenerResult.fail(failedStreams.get(0).getRemark()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java new file mode 100644 index 00000000000..001f303a7ba --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OfflineJobOperatorFactory { + + private static final Logger log = LoggerFactory.getLogger(OfflineJobOperatorFactory.class); + public static final String DEFAULT_OPERATOR_CLASS_NAME = + "org.apache.inlong.manager.plugin.offline.FlinkOfflineJobOperator"; + + public static OfflineJobOperator getOfflineJobOperator() { + return getOfflineJobOperator(DEFAULT_OPERATOR_CLASS_NAME); + } + + public static OfflineJobOperator getOfflineJobOperator(String operatorClassName) { + return getOfflineJobOperator(operatorClassName, Thread.currentThread().getContextClassLoader()); + } + + public static OfflineJobOperator getOfflineJobOperator(String operatorClassName, ClassLoader classLoader) { + try { + Class operatorClass = classLoader.loadClass(operatorClassName); + Object operator = operatorClass.getDeclaredConstructor().newInstance(); + return (OfflineJobOperator) operator; + } catch (Throwable e) { + log.error("Failed to get offline job operator: ", e); + throw new BusinessException("Failed to get offline job operator: " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index e07dfed6b88..a20fa0cb104 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -56,7 +56,6 @@ public class ScheduleOperatorImpl implements ScheduleOperator { @Autowired private ScheduleClientFactory scheduleClientFactory; - @Autowired private OfflineJobOperator offlineJobOperator; private ScheduleEngineClient scheduleEngineClient; @@ -181,6 +180,9 @@ public Boolean handleGroupApprove(String groupId) { @Override public Boolean submitOfflineJob(String groupId, List streamInfoList) { + if (offlineJobOperator == null) { + offlineJobOperator = OfflineJobOperatorFactory.getOfflineJobOperator(); + } try { offlineJobOperator.submitOfflineJob(groupId, streamInfoList); LOGGER.info("Submit offline job for group {} and stream list {} success.", groupId, From 7727f4aa25d535d6cb058c0b9b103ec4394f5333 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Wed, 26 Jun 2024 10:17:44 +0800 Subject: [PATCH 5/6] improve log --- .../java/org/apache/inlong/manager/plugin/util/FlinkUtils.java | 2 +- .../inlong/manager/service/group/InlongGroupServiceImpl.java | 2 +- .../manager/service/listener/sort/SortConfigListener.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index 92285ab679e..82a0d628c1c 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -224,7 +224,7 @@ public static ListenerResult submitFlinkJobs(String groupId, List s.getSinkList() == null ? 0 : s.getSinkList().size()) .reduce(0, Integer::sum); if (sinkCount == 0) { - log.warn("not any sink configured for group {} and stream list {}, skip launching sort job", groupId, + log.warn("Not any sink configured for group {} and stream list {}, skip launching sort job", groupId, streamInfoList.stream() .map(s -> s.getInlongGroupId() + ":" + s.getName()).collect(Collectors.toList())); return ListenerResult.success(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 4430c9cdce3..d2f3c0c948a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -936,7 +936,7 @@ public Boolean submitOfflineJob(String groupId) { // 1. get stream info list InlongGroupInfo groupInfo = get(groupId); if (groupInfo == null) { - String msg = String.format("InLong group not found for groupId=%s", groupId); + String msg = String.format("InLong group not found for group=%s", groupId); LOGGER.error(msg); throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index 5fadf2d82c8..556a4c41fa4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -137,7 +137,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc } } } catch (Exception e) { - String msg = String.format("failed to build sort config for groupId=%s, ", groupId); + String msg = String.format("Failed to build sort config for group=%s, ", groupId); LOGGER.error("{} streamInfos={}", msg, streamInfos, e); throw new WorkflowListenerException(msg + e.getMessage()); } From 5e62d63d196ee34e1470d850d8d71116db628c19 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Wed, 26 Jun 2024 10:45:34 +0800 Subject: [PATCH 6/6] fix submitOfflineJob interface in InLongGroupController --- .../inlong/manager/web/controller/InlongGroupController.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 90fdaa3f981..3a9389c2973 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -255,8 +255,7 @@ public Response finishTagSwitch(@PathVariable String groupId) { @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Submitting inlong offline job process") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) - public Response submitOfflineJob(@PathVariable String groupId) { - String operator = LoginUserUtils.getLoginUser().getName(); - return Response.success(groupProcessOperation.restartProcess(groupId, operator)); + public Response submitOfflineJob(@PathVariable String groupId) { + return Response.success(groupService.submitOfflineJob(groupId)); } } \ No newline at end of file