diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index b0624fbe662..5d1dd82578d 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -20,8 +20,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; -import org.apache.inlong.manager.plugin.util.FlinkUtils; -import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; @@ -32,9 +30,9 @@ import lombok.extern.slf4j.Slf4j; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; + +import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs; /** * Listener of startup sort. @@ -62,8 +60,7 @@ public boolean accept(WorkflowContext workflowContext) { } log.info("add startup group listener for groupId [{}]", groupId); - return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) - || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -77,36 +74,9 @@ public ListenerResult listen(WorkflowContext context) throws Exception { } GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm; - InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo(); - // do not build sort config if the group mode is offline - if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { - log.info("no need to launching sort job for groupId={} as the mode is offline", - groupId); - return ListenerResult.success(); - } List streamInfos = groupResourceForm.getStreamInfos(); - int sinkCount = streamInfos.stream() - .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) - .reduce(0, Integer::sum); - if (sinkCount == 0) { - log.warn("not any sink configured for group {}, skip launching sort job", groupId); - return ListenerResult.success(); - } - List listenerResults = new ArrayList<>(); - for (InlongStreamInfo streamInfo : streamInfos) { - listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, - FlinkUtils.genFlinkJobName(processForm, streamInfo))); - } - - // only one stream in group for now - // we can return the list of ListenerResult if support multi-stream in the future - List failedStreams = listenerResults.stream() - .filter(t -> !t.isSuccess()).collect(Collectors.toList()); - if (failedStreams.isEmpty()) { - return ListenerResult.success(); - } - return ListenerResult.fail(failedStreams.get(0).getRemark()); + return submitFlinkJobs(groupId, streamInfos); } /** diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java new file mode 100644 index 00000000000..09740c53f34 --- /dev/null +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.offline; + +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; + +import lombok.NoArgsConstructor; + +import java.util.List; + +import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs; + +@NoArgsConstructor +public class FlinkOfflineJobOperator implements OfflineJobOperator { + + @Override + public void submitOfflineJob(String groupId, List streamInfoList) throws Exception { + submitFlinkJobs(groupId, streamInfoList); + } +} diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index d4c7863371d..82a0d628c1c 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -218,6 +218,33 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception { return flinkConfig; } + public static ListenerResult submitFlinkJobs(String groupId, List streamInfoList) + throws Exception { + int sinkCount = streamInfoList.stream() + .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) + .reduce(0, Integer::sum); + if (sinkCount == 0) { + log.warn("Not any sink configured for group {} and stream list {}, skip launching sort job", groupId, + streamInfoList.stream() + .map(s -> s.getInlongGroupId() + ":" + s.getName()).collect(Collectors.toList())); + return ListenerResult.success(); + } + + List listenerResults = new ArrayList<>(); + for (InlongStreamInfo streamInfo : streamInfoList) { + listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo))); + } + + // only one stream in group for now + // we can return the list of ListenerResult if support multi-stream in the future + List failedStreams = listenerResults.stream() + .filter(t -> !t.isSuccess()).collect(Collectors.toList()); + if (failedStreams.isEmpty()) { + return ListenerResult.success(); + } + return ListenerResult.fail(failedStreams.get(0).getRemark()); + } + public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception { List sinkList = streamInfo.getSinkList(); List sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList()); @@ -295,4 +322,9 @@ public static String genFlinkJobName(ProcessForm processForm, InlongStreamInfo s return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + streamInfo.getInlongStreamId(); } + + public static String genFlinkJobName(InlongStreamInfo streamInfo) { + return String.format(Constants.SORT_JOB_NAME_TEMPLATE, streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN + + streamInfo.getInlongStreamId(); + } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java index 24f2e6196ef..0bc8d2e6dbb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -89,16 +89,16 @@ public boolean equals(Object o) { return false; } ScheduleInfo that = (ScheduleInfo) o; - return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) - && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, - that.scheduleUnit) + return Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) - && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) - && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, - that.selfDepend) + && Objects.equals(startTime, that.startTime) + && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) + && Objects.equals(selfDepend, that.selfDepend) && Objects.equals(taskParallelism, that.taskParallelism) - && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, - that.version); + && Objects.equals(crontabExpression, that.crontabExpression); } @Override diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java index a324cee7f57..7e81f7533fa 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -83,16 +83,16 @@ public boolean equals(Object o) { return false; } ScheduleInfoRequest that = (ScheduleInfoRequest) o; - return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) - && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, - that.scheduleUnit) + return Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) + && Objects.equals(scheduleUnit, that.scheduleUnit) && Objects.equals(scheduleInterval, that.scheduleInterval) - && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) - && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, - that.selfDepend) + && Objects.equals(startTime, that.startTime) + && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) + && Objects.equals(selfDepend, that.selfDepend) && Objects.equals(taskParallelism, that.taskParallelism) - && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, - that.version); + && Objects.equals(crontabExpression, that.crontabExpression); } @Override diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java index a122235de39..7b7ddf19b0b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java @@ -26,7 +26,7 @@ public class NoopScheduleClient implements ScheduleEngineClient { @Override public boolean accept(String engineType) { - return ScheduleEngineType.NONE.getType().equals(engineType); + return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index 4906e91aa7e..c6643c3259b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -218,4 +218,11 @@ void updateAfterApprove( */ List getGroupByBackUpClusterTag(String clusterTag); + /** + * Submitting offline job for the given group. + * @param groupId the inlong group to submit offline job + * + * */ + Boolean submitOfflineJob(String groupId); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index d42a61dbfb0..d2f3c0c948a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -931,4 +931,22 @@ public List getGroupByBackUpClusterTag(String clusterTag) { return groupInfoList; } + @Override + public Boolean submitOfflineJob(String groupId) { + // 1. get stream info list + InlongGroupInfo groupInfo = get(groupId); + if (groupInfo == null) { + String msg = String.format("InLong group not found for group=%s", groupId); + LOGGER.error(msg); + throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); + } + + List streamInfoList = streamService.list(groupId); + if (CollectionUtils.isEmpty(streamInfoList)) { + LOGGER.warn("No stream info found for group {}, skip submit offline job", groupId); + return false; + } + return scheduleOperator.submitOfflineJob(groupId, streamInfoList); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index 067a320a495..556a4c41fa4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -125,12 +125,6 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc try { for (InlongStreamInfo streamInfo : streamInfos) { - // do not build sort config if the group mode is offline - if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { - LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline", - groupId, streamInfo.getInlongStreamId()); - continue; - } List sinkList = streamInfo.getSinkList(); if (CollectionUtils.isEmpty(sinkList)) { continue; @@ -143,8 +137,8 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc } } } catch (Exception e) { - String msg = String.format("failed to build sort config for groupId=%s, ", groupId); - LOGGER.error(msg + "streamInfos=" + streamInfos, e); + String msg = String.format("Failed to build sort config for group=%s, ", groupId); + LOGGER.error("{} streamInfos={}", msg, streamInfos, e); throw new WorkflowListenerException(msg + e.getMessage()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java new file mode 100644 index 00000000000..001f303a7ba --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OfflineJobOperatorFactory { + + private static final Logger log = LoggerFactory.getLogger(OfflineJobOperatorFactory.class); + public static final String DEFAULT_OPERATOR_CLASS_NAME = + "org.apache.inlong.manager.plugin.offline.FlinkOfflineJobOperator"; + + public static OfflineJobOperator getOfflineJobOperator() { + return getOfflineJobOperator(DEFAULT_OPERATOR_CLASS_NAME); + } + + public static OfflineJobOperator getOfflineJobOperator(String operatorClassName) { + return getOfflineJobOperator(operatorClassName, Thread.currentThread().getContextClassLoader()); + } + + public static OfflineJobOperator getOfflineJobOperator(String operatorClassName, ClassLoader classLoader) { + try { + Class operatorClass = classLoader.loadClass(operatorClassName); + Object operator = operatorClass.getDeclaredConstructor().newInstance(); + return (OfflineJobOperator) operator; + } catch (Throwable e) { + log.error("Failed to get offline job operator: ", e); + throw new BusinessException("Failed to get offline job operator: " + e.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java index 6bf9c014326..653523294e1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java @@ -19,6 +19,9 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import java.util.List; /** * Operator for schedule. Including: @@ -92,4 +95,12 @@ public interface ScheduleOperator { * @Return whether succeed * */ Boolean handleGroupApprove(String groupId); + + /** + * Start offline sync job when the schedule instance callback. + * @param groupId groupId to start offline job + * @param streamInfoList stream list to start offline job + * @Return whether succeed + * */ + Boolean submitOfflineJob(String groupId, List streamInfoList); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java index 411a80766a8..a20fa0cb104 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -18,14 +18,16 @@ package org.apache.inlong.manager.service.schedule; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; -import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.schedule.ScheduleClientFactory; import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.workflow.processor.OfflineJobOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,9 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED; import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED; import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED; @@ -48,12 +53,11 @@ public class ScheduleOperatorImpl implements ScheduleOperator { @Autowired private InlongGroupExtEntityMapper groupExtMapper; - @Autowired - private ScheduleEntityMapper scheduleMapper; - @Autowired private ScheduleClientFactory scheduleClientFactory; + private OfflineJobOperator offlineJobOperator; + private ScheduleEngineClient scheduleEngineClient; @Override @@ -75,7 +79,7 @@ private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, Str String groupId = scheduleInfo.getInlongGroupId(); InlongGroupExtEntity scheduleStatusExt = groupExtMapper.selectByUniqueKey(groupId, InlongConstants.REGISTER_SCHEDULE_STATUS); - if (InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) { + if (scheduleStatusExt != null && InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) { // change schedule state to approved scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), APPROVED, operator); registerToScheduleEngine(scheduleInfo, operator, false); @@ -124,8 +128,10 @@ public Boolean updateOpt(ScheduleInfoRequest request, String operator) { @Override @Transactional(rollbackFor = Throwable.class) public Boolean updateAndRegister(ScheduleInfoRequest request, String operator) { - updateOpt(request, operator); - return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true); + if (updateOpt(request, operator)) { + return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true); + } + return false; } /** @@ -172,4 +178,21 @@ public Boolean handleGroupApprove(String groupId) { return registerToScheduleEngine(scheduleInfo, null, false); } + @Override + public Boolean submitOfflineJob(String groupId, List streamInfoList) { + if (offlineJobOperator == null) { + offlineJobOperator = OfflineJobOperatorFactory.getOfflineJobOperator(); + } + try { + offlineJobOperator.submitOfflineJob(groupId, streamInfoList); + LOGGER.info("Submit offline job for group {} and stream list {} success.", groupId, + streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList())); + } catch (Exception e) { + String errorMsg = String.format("Submit offline job failed for groupId=%s", groupId); + LOGGER.error(errorMsg, e); + throw new BusinessException(errorMsg); + } + return true; + } + } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 04d89d332c7..3a9389c2973 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -252,4 +252,10 @@ public Response finishTagSwitch(@PathVariable String groupId) { return Response.success(groupService.finishTagSwitch(groupId)); } + @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = RequestMethod.POST) + @ApiOperation(value = "Submitting inlong offline job process") + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) + public Response submitOfflineJob(@PathVariable String groupId) { + return Response.success(groupService.submitOfflineJob(groupId)); + } } \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties index a6eec820add..a498ac4d15c 100644 --- a/inlong-manager/manager-web/src/main/resources/application.properties +++ b/inlong-manager/manager-web/src/main/resources/application.properties @@ -65,8 +65,4 @@ audit.admin.ids=3,4,5,6 audit.user.ids=3,4,5,6 # tencent cloud log service endpoint, The Operator cls resource by it -cls.manager.endpoint=127.0.0.1 - -# schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +cls.manager.endpoint=127.0.0.1 \ No newline at end of file diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java new file mode 100644 index 00000000000..f6d395dc7e1 --- /dev/null +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.workflow.processor; + +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; + +import java.util.List; + +public interface OfflineJobOperator { + + void submitOfflineJob(String groupId, List streamInfoList) throws Exception; +}