Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10459][Manager] Support schedule instance callback to submit Flink batch job #10510

Merged
merged 6 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
Expand All @@ -32,9 +30,9 @@

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;

/**
* Listener of startup sort.
Expand Down Expand Up @@ -62,8 +60,7 @@ public boolean accept(WorkflowContext workflowContext) {
}

log.info("add startup group listener for groupId [{}]", groupId);
return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
}

@Override
Expand All @@ -77,36 +74,9 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
}

GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
// do not build sort config if the group mode is offline
if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
log.info("no need to launching sort job for groupId={} as the mode is offline",
groupId);
return ListenerResult.success();
}
List<InlongStreamInfo> streamInfos = groupResourceForm.getStreamInfos();
int sinkCount = streamInfos.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
if (sinkCount == 0) {
log.warn("not any sink configured for group {}, skip launching sort job", groupId);
return ListenerResult.success();
}

List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfos) {
listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(processForm, streamInfo)));
}

// only one stream in group for now
// we can return the list of ListenerResult if support multi-stream in the future
List<ListenerResult> failedStreams = listenerResults.stream()
.filter(t -> !t.isSuccess()).collect(Collectors.toList());
if (failedStreams.isEmpty()) {
return ListenerResult.success();
}
return ListenerResult.fail(failedStreams.get(0).getRemark());
return submitFlinkJobs(groupId, streamInfos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.plugin.offline;

import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;

import org.springframework.stereotype.Component;

import java.util.List;

import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;

@Component
public class FlinkOfflineJobOperator implements OfflineJobOperator {

@Override
public void submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList) throws Exception {
submitFlinkJobs(groupId, streamInfoList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception {
return flinkConfig;
}

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList)
throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
if (sinkCount == 0) {
log.warn("not any sink configured for group {} and stream list {}, skip launching sort job", groupId,
streamInfoList.stream()
.map(s -> s.getInlongGroupId() + ":" + s.getName()).collect(Collectors.toList()));
return ListenerResult.success();
}

List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo)));
}

// only one stream in group for now
// we can return the list of ListenerResult if support multi-stream in the future
List<ListenerResult> failedStreams = listenerResults.stream()
.filter(t -> !t.isSuccess()).collect(Collectors.toList());
if (failedStreams.isEmpty()) {
ListenerResult.success();
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
}
return ListenerResult.fail(failedStreams.get(0).getRemark());
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
Expand Down Expand Up @@ -295,4 +322,9 @@ public static String genFlinkJobName(ProcessForm processForm, InlongStreamInfo s
return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
}

public static String genFlinkJobName(InlongStreamInfo streamInfo) {
return String.format(Constants.SORT_JOB_NAME_TEMPLATE, streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@ public boolean equals(Object o) {
return false;
}
ScheduleInfo that = (ScheduleInfo) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(startTime, that.startTime)
&& Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime)
&& Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
&& Objects.equals(crontabExpression, that.crontabExpression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ public boolean equals(Object o) {
return false;
}
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit,
that.scheduleUnit)
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend,
that.selfDepend)
&& Objects.equals(startTime, that.startTime)
&& Objects.equals(endTime, that.endTime)
&& Objects.equals(delayTime, that.delayTime)
&& Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
&& Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version,
that.version);
&& Objects.equals(crontabExpression, that.crontabExpression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class NoopScheduleClient implements ScheduleEngineClient {

@Override
public boolean accept(String engineType) {
return ScheduleEngineType.NONE.getType().equals(engineType);
return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,11 @@ void updateAfterApprove(
*/
List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);

/**
* Submitting offline job for the given group.
* @param groupId the inlong group to submit offline job
*
* */
Boolean submitOfflineJob(String groupId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -931,4 +931,22 @@ public List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag) {
return groupInfoList;
}

@Override
public Boolean submitOfflineJob(String groupId) {
// 1. get stream info list
InlongGroupInfo groupInfo = get(groupId);
if (groupInfo == null) {
String msg = String.format("InLong group not found for groupId=%s", groupId);
LOGGER.error(msg);
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}

List<InlongStreamInfo> streamInfoList = streamService.list(groupId);
if (CollectionUtils.isEmpty(streamInfoList)) {
LOGGER.warn("No stream info found for group {}, skip submit offline job", groupId);
return false;
}
return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

try {
for (InlongStreamInfo streamInfo : streamInfos) {
// do not build sort config if the group mode is offline
if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline",
groupId, streamInfo.getInlongStreamId());
continue;
}
List<StreamSink> sinkList = streamInfo.getSinkList();
if (CollectionUtils.isEmpty(sinkList)) {
continue;
Expand All @@ -144,7 +138,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
}
} catch (Exception e) {
String msg = String.format("failed to build sort config for groupId=%s, ", groupId);
LOGGER.error(msg + "streamInfos=" + streamInfos, e);
LOGGER.error("{} streamInfos={}", msg, streamInfos, e);
throw new WorkflowListenerException(msg + e.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;

import java.util.List;

/**
* Operator for schedule. Including:
Expand Down Expand Up @@ -92,4 +95,12 @@ public interface ScheduleOperator {
* @Return whether succeed
* */
Boolean handleGroupApprove(String groupId);

/**
* Start offline sync job when the schedule instance callback.
* @param groupId groupId to start offline job
* @param streamInfoList stream list to start offline job
* @Return whether succeed
* */
Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
package org.apache.inlong.manager.service.schedule;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.schedule.ScheduleClientFactory;
import org.apache.inlong.manager.schedule.ScheduleEngineClient;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED;
import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED;
import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED;
Expand All @@ -49,10 +54,10 @@ public class ScheduleOperatorImpl implements ScheduleOperator {
private InlongGroupExtEntityMapper groupExtMapper;

@Autowired
private ScheduleEntityMapper scheduleMapper;
private ScheduleClientFactory scheduleClientFactory;

@Autowired
private ScheduleClientFactory scheduleClientFactory;
private OfflineJobOperator offlineJobOperator;

private ScheduleEngineClient scheduleEngineClient;

Expand All @@ -75,7 +80,7 @@ private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, Str
String groupId = scheduleInfo.getInlongGroupId();
InlongGroupExtEntity scheduleStatusExt =
groupExtMapper.selectByUniqueKey(groupId, InlongConstants.REGISTER_SCHEDULE_STATUS);
if (InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
if (scheduleStatusExt != null && InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
// change schedule state to approved
scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), APPROVED, operator);
registerToScheduleEngine(scheduleInfo, operator, false);
Expand Down Expand Up @@ -124,8 +129,10 @@ public Boolean updateOpt(ScheduleInfoRequest request, String operator) {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean updateAndRegister(ScheduleInfoRequest request, String operator) {
updateOpt(request, operator);
return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true);
if (updateOpt(request, operator)) {
return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true);
}
return false;
}

/**
Expand Down Expand Up @@ -172,4 +179,18 @@ public Boolean handleGroupApprove(String groupId) {
return registerToScheduleEngine(scheduleInfo, null, false);
}

@Override
public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList) {
try {
offlineJobOperator.submitOfflineJob(groupId, streamInfoList);
LOGGER.info("Submit offline job for group {} and stream list {} success.", groupId,
streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList()));
} catch (Exception e) {
String errorMsg = String.format("Submit offline job failed for groupId=%s", groupId);
LOGGER.error(errorMsg, e);
throw new BusinessException(errorMsg);
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,11 @@ public Response<Boolean> finishTagSwitch(@PathVariable String groupId) {
return Response.success(groupService.finishTagSwitch(groupId));
}

@RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = RequestMethod.POST)
@ApiOperation(value = "Submitting inlong offline job process")
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class)
public Response<WorkflowResult> submitOfflineJob(@PathVariable String groupId) {
String operator = LoginUserUtils.getLoginUser().getName();
return Response.success(groupProcessOperation.restartProcess(groupId, operator));
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,4 @@ audit.admin.ids=3,4,5,6
audit.user.ids=3,4,5,6

# tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=127.0.0.1

# schedule engine type
# support none(no scheduler) and quartz(quartz scheduler), default is none
inlong.schedule.engine=none
cls.manager.endpoint=127.0.0.1
Loading
Loading