Skip to content

Commit

Permalink
[INLONG-10561][Manager] Support configrations for bounded source (apa…
Browse files Browse the repository at this point in the history
…che#10571)

Co-authored-by: Aloys Zhang <[email protected]>
  • Loading branch information
aloyszhang and Aloys Zhang committed Jul 9, 2024
1 parent 63c4a74 commit 3666e3f
Show file tree
Hide file tree
Showing 19 changed files with 250 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
Expand Down Expand Up @@ -320,8 +321,8 @@ public Boolean finishTagSwitch(String groupId) {
return response.getData();
}

public Boolean submitOfflineJob(String groupId) {
Response<Boolean> responseBody = ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
public Boolean submitOfflineJob(OfflineJobSubmitRequest request) {
Response<Boolean> responseBody = ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(request));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;

import retrofit2.Call;
Expand Down Expand Up @@ -99,6 +100,6 @@ public interface InlongGroupApi {
@GET("group/switch/finish/{groupId}")
Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);

@POST("group/submitOfflineJob/{groupId}")
Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
@POST("group/submitOfflineJob")
Call<Response<Boolean>> submitOfflineJob(@Body OfflineJobSubmitRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public class InlongConstants {
public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";

public static final String BOUNDARY_TYPE = "BOUNDARY_TYPE";
public static final String LOWER_BOUNDARY = "LOWER_BOUNDARY";
public static final String UPPER_BOUNDARY = "UPPER_BOUNDARY";

public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public enum ErrorCodeEnum {
SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"),
SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition is not allowed"),

BOUNDED_SOURCE_TYPE_NOT_SUPPORTED(1801, "Bounded source type %s not supported"),
BOUNDARY_TYPE_NOT_SUPPORTED(1802, "Boundary type %s not supported"),
BOUNDARIES_NOT_FOUND(1803, "Boundaries not found"),

WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"),
WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.plugin.flink;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
Expand Down Expand Up @@ -258,8 +259,16 @@ private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
list.add("60000");
list.add("-runtime.execution.mode");
list.add(flinkInfo.getRuntimeExecutionMode());
if (InlongConstants.RUNTIME_EXECUTION_MODE_BATCH.equalsIgnoreCase(flinkInfo.getRuntimeExecutionMode())) {
list.add("-runtime.execution.mode");
list.add(flinkInfo.getRuntimeExecutionMode());
list.add("-source.boundary.type");
list.add(flinkInfo.getBoundaryType());
list.add("-source.lower.boundary");
list.add(flinkInfo.getLowerBoundary());
list.add("-source.upper.boundary");
list.add(flinkInfo.getUpperBoundary());
}
return list.toArray(new String[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,10 @@ public class FlinkInfo {
private String exceptionMsg;

private String runtimeExecutionMode;

private String boundaryType;

private String lowerBoundary;

private String upperBoundary;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.plugin.offline;

import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;

Expand All @@ -30,7 +31,8 @@
public class FlinkOfflineJobOperator implements OfflineJobOperator {

@Override
public void submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList) throws Exception {
submitFlinkJobs(groupId, streamInfoList, true);
public void submitOfflineJob(String groupId, List<InlongStreamInfo> streamInfoList, Boundaries boundaries)
throws Exception {
submitFlinkJobs(groupId, streamInfoList, true, boundaries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.manager.plugin.util;

import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.JsonUtils;
Expand Down Expand Up @@ -222,11 +223,11 @@ public static FlinkConfig getFlinkConfigFromFile() throws Exception {

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList)
throws Exception {
return submitFlinkJobs(groupId, streamInfoList, false);
return submitFlinkJobs(groupId, streamInfoList, false, null);
}

public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamInfo> streamInfoList,
boolean isBatchJob) throws Exception {
boolean isBatchJob, Boundaries boundaries) throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
Expand All @@ -240,7 +241,8 @@ public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamIn
List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
listenerResults.add(
FlinkUtils.submitFlinkJob(streamInfo, FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
FlinkUtils.submitFlinkJob(
streamInfo, FlinkUtils.genFlinkJobName(streamInfo), isBatchJob, boundaries));
}

// only one stream in group for now
Expand All @@ -254,11 +256,11 @@ public static ListenerResult submitFlinkJobs(String groupId, List<InlongStreamIn
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName) throws Exception {
return submitFlinkJob(streamInfo, jobName, false);
return submitFlinkJob(streamInfo, jobName, false, null);
}

public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName, boolean isBatchJob)
throws Exception {
public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String jobName,
boolean isBatchJob, Boundaries boundaries) throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes = sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(sinkTypes)) {
Expand Down Expand Up @@ -298,6 +300,9 @@ public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, String
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
if (isBatchJob) {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
flinkInfo.setBoundaryType(boundaries.getBoundaryType().getType());
flinkInfo.setLowerBoundary(boundaries.getLowerBound());
flinkInfo.setUpperBoundary(boundaries.getUpperBound());
} else {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule;

import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import javax.validation.constraints.NotNull;

@Data
public class OfflineJobSubmitRequest {

@ApiModelProperty("Inlong Group ID")
@NotNull
private String groupId;

@ApiModelProperty("Source boundary type, TIME and OFFSET are supported")
@NotNull
private String boundaryType;

@ApiModelProperty("The lower bound for bounded source")
@NotNull
private String lowerBoundary;

@ApiModelProperty("The upper bound for bounded source")
@NotNull
private String upperBoundary;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -35,10 +37,11 @@
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import static org.apache.inlong.manager.schedule.util.ScheduleUtils.END_TIME;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.PASSWORD;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.USERNAME;

@Data
@NoArgsConstructor
Expand All @@ -49,31 +52,49 @@ public class QuartzOfflineSyncJob implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(QuartzOfflineSyncJob.class);

private volatile InlongGroupClient groupClient;
private long endTime;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
LOGGER.info("QuartzOfflineSyncJob run once");
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
initGroupClientIfNeeded(jobDataMap);

String inlongGroupId = context.getJobDetail().getKey().getName();
LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
if (groupClient.submitOfflineJob(inlongGroupId)) {
LOGGER.info("Successfully submitting offline job for group {}", inlongGroupId);
} else {
LOGGER.warn("Failed to submit offline job for group {}", inlongGroupId);
long lowerBoundary = context.getScheduledFireTime().getTime();
long upperBoundary = context.getNextFireTime() == null ? endTime : context.getNextFireTime().getTime();

OfflineJobSubmitRequest request = new OfflineJobSubmitRequest();
request.setGroupId(inlongGroupId);
request.setBoundaryType(BoundaryType.TIME.getType());
request.setLowerBoundary(String.valueOf(lowerBoundary));
request.setUpperBoundary(String.valueOf(upperBoundary));
LOGGER.info("Starting submit offline job for group: {}, with lower boundary: {} and upper boundary: {}",
inlongGroupId, lowerBoundary, upperBoundary);

try {
if (groupClient.submitOfflineJob(request)) {
LOGGER.info("Successfully submitting offline job for group {}", inlongGroupId);
} else {
LOGGER.warn("Failed to submit offline job for group {}", inlongGroupId);
}
} catch (Exception e) {
LOGGER.error("Exception to submit offline job for group {}, error msg: {}", inlongGroupId, e.getMessage());
}

}

private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
if (groupClient == null) {
String host = (String) jobDataMap.get(MANAGER_HOST);
int port = (int) jobDataMap.get(MANAGER_PORT);
String secreteId = (String) jobDataMap.get(SECRETE_ID);
String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
LOGGER.info("Initializing Inlong group client, with host: {}, port: {}, userName : {}",
host, port, secreteId);
String username = (String) jobDataMap.get(USERNAME);
String password = (String) jobDataMap.get(PASSWORD);
endTime = (long) jobDataMap.get(END_TIME);
LOGGER.info("Initializing Inlong group client, with host: {}, port: {}, userName : {}, endTime: {}",
host, port, username, endTime);
ClientConfiguration configuration = new ClientConfiguration();
configuration.setAuthentication(new DefaultAuthentication(secreteId, secreteKey));
configuration.setAuthentication(new DefaultAuthentication(username, password));
String serviceUrl = host + ":" + port;
InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, configuration);
ClientFactory clientFactory = ClientUtils.getClientFactory(inlongClient.getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,20 @@ public class ScheduleUtils {

public static final String MANAGER_HOST = "HOST";
public static final String MANAGER_PORT = "PORT";
public static final String SECRETE_ID = "SECRETE_ID";
public static final String SECRETE_KEY = "SECRETE_KEY";
public static final String USERNAME = "USERNAME";
public static final String PASSWORD = "PASSWORD";
public static final String END_TIME = "END_TIME";

public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, Class<? extends Job> clz,
String host, Integer port, String secreteId, String secreteKey) {
String host, Integer port, String username, String password) {

return JobBuilder.newJob(clz)
.withIdentity(scheduleInfo.getInlongGroupId())
.usingJobData(MANAGER_HOST, host)
.usingJobData(MANAGER_PORT, port)
.usingJobData(SECRETE_ID, secreteId)
.usingJobData(SECRETE_KEY, secreteKey)
.usingJobData(USERNAME, username)
.usingJobData(PASSWORD, password)
.usingJobData(END_TIME, scheduleInfo.getEndTime().getTime())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.sql.Timestamp;

import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_ROUND;
import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;

public class BaseScheduleTest {
Expand All @@ -42,6 +43,10 @@ public ScheduleInfo genDefaultScheduleInfo() {
return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
}

public ScheduleInfo genOneroundScheduleInfo() {
return genNormalScheduleInfo(GROUP_ID, ONE_ROUND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
}

public ScheduleInfo genNormalScheduleInfo(String groupId, String scheduleUnit, int scheduleInterval,
long timeSpanInMs) {
ScheduleInfo scheduleInfo = new ScheduleInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.user.UserInfo;

import javax.validation.Valid;
Expand Down Expand Up @@ -219,10 +220,10 @@ void updateAfterApprove(
List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);

/**
* Submitting offline job for the given group.
* @param groupId the inlong group to submit offline job
* Submitting offline job.
* @param request request to submit offline sync job
*
* */
Boolean submitOfflineJob(String groupId);
Boolean submitOfflineJob(OfflineJobSubmitRequest request);

}
Loading

0 comments on commit 3666e3f

Please sign in to comment.