Skip to content

Commit

Permalink
[ISSUE apache#5075] update eventmesh-admin-server create task response
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Aug 8, 2024
1 parent 791814c commit a32d930
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -37,8 +38,8 @@ public class HttpServer {

@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
String uuid = taskService.createTask(task);
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(uuid)));
CreateTaskResponse createTaskResponse = taskService.createTask(task);
return ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
}

public boolean deleteTask(Long id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.eventmesh.admin.server.web.service.task;

import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.web.Response;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshTaskInfoService;
import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
Expand All @@ -27,10 +29,12 @@
import org.apache.eventmesh.common.remote.datasource.DataSource;
import org.apache.eventmesh.common.remote.datasource.DataSourceType;
import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
import org.apache.eventmesh.common.utils.JsonUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -55,17 +59,28 @@ public class TaskBizService {
@Autowired
private AdminServerProperties properties;

private static final String TYPE = "type";

private static final String DESC = "desc";

private static final String CONF_CLAZZ = "confClazz";

private static final String CONF = "conf";

private static final String REGION = "region";

@Transactional
public String createTask(CreateTaskRequest req) {
public CreateTaskResponse createTask(CreateTaskRequest req) {
String taskID = req.getTaskId();
if (StringUtils.isEmpty(taskID)) {
taskID = UUID.randomUUID().toString();
req.setTaskId(taskID);
}

String targetRegion = req.getTargetRegion();
String remoteResponse = "";
// not from other admin && target not equals with self region
if (!req.isFlag() && !StringUtils.equals(properties.getRegion(), targetRegion)) {
if (!req.isFlag() && !properties.getRegion().equals(targetRegion)) {
List<String> adminServerList = properties.getAdminServerList().get(targetRegion);
if (adminServerList == null || adminServerList.isEmpty()) {
throw new RuntimeException("No admin server available for region: " + targetRegion);
Expand All @@ -78,6 +93,7 @@ public String createTask(CreateTaskRequest req) {
if (!response.getStatusCode().is2xxSuccessful()) {
throw new RuntimeException("Failed to create task on admin server: " + targetUrl);
}
remoteResponse = response.getBody();
}

String finalTaskID = taskID;
Expand All @@ -93,7 +109,7 @@ public String createTask(CreateTaskRequest req) {
job.setUpdateUid(req.getUid());
return job;
}).collect(Collectors.toList());
jobInfoService.createJobs(jobs);

EventMeshTaskInfo taskInfo = new EventMeshTaskInfo();
taskInfo.setTaskID(finalTaskID);
taskInfo.setTaskName(req.getTaskName());
Expand All @@ -102,38 +118,58 @@ public String createTask(CreateTaskRequest req) {
taskInfo.setCreateUid(req.getUid());
taskInfo.setSourceRegion(req.getSourceRegion());
taskInfo.setTargetRegion(req.getTargetRegion());
List<EventMeshJobInfo> eventMeshJobInfoList = jobInfoService.createJobs(jobs);
taskInfoService.save(taskInfo);
return finalTaskID;
return buildCreateTaskResponse(finalTaskID, eventMeshJobInfoList, remoteResponse);
}

private JobDetail parse(CreateTaskRequest.JobDetail src) throws ClassNotFoundException {
JobDetail dst = new JobDetail();
dst.setJobDesc(src.getJobDesc());
dst.setTransportType(src.getTransportType());
dst.setSourceConnectorDesc(src.getSourceConnectorDesc());
Map<String, Object> sourceDataMap = src.getSourceDataSource();
DataSource sourceDataSource = new DataSource();
sourceDataSource.setType(DataSourceType.fromString(sourceDataMap.get("type").toString()));
sourceDataSource.setDesc((String) sourceDataMap.get("desc"));
sourceDataSource.setConfClazz((Class<? extends Config>) Class.forName(sourceDataMap.get("confClazz").toString()));
sourceDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sourceDataMap.get("conf")), sourceDataSource.getConfClazz()));
sourceDataSource.setRegion((String) sourceDataMap.get("region"));
dst.setSourceDataSource(sourceDataSource);

try {
dst.setSourceDataSource(mapToDataSource(src.getSourceDataSource()));
dst.setSinkDataSource(mapToDataSource(src.getSinkDataSource()));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to map data source", e);
}
dst.setSinkConnectorDesc(src.getSinkConnectorDesc());
Map<String, Object> sinkDataMap = src.getSinkDataSource();
DataSource sinkDataSource = new DataSource();
sinkDataSource.setType(DataSourceType.fromString(sinkDataMap.get("type").toString()));
sinkDataSource.setDesc((String) sinkDataMap.get("desc"));
sinkDataSource.setConfClazz((Class<? extends Config>) Class.forName(sinkDataMap.get("confClazz").toString()));
sinkDataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(sinkDataMap.get("conf")), sinkDataSource.getConfClazz()));
sinkDataSource.setRegion((String) sinkDataMap.get("region"));
dst.setSinkDataSource(sinkDataSource);

// full/increase/check
dst.setJobType(src.getJobType());
dst.setFromRegion(src.getFromRegion());
dst.setRunningRegion(src.getRunningRegion());
return dst;
}

private DataSource mapToDataSource(Map<String, Object> dataMap) throws ClassNotFoundException {
DataSource dataSource = new DataSource();
dataSource.setType(DataSourceType.fromString(dataMap.get(TYPE).toString()));
dataSource.setDesc((String) dataMap.get(DESC));
dataSource.setConfClazz((Class<? extends Config>) Class.forName(dataMap.get(CONF_CLAZZ).toString()));
dataSource.setConf(JsonUtils.parseObject(JsonUtils.toJSONString(dataMap.get(CONF)), dataSource.getConfClazz()));
dataSource.setRegion((String) dataMap.get(REGION));
return dataSource;
}

private CreateTaskResponse buildCreateTaskResponse(String taskId, List<EventMeshJobInfo> eventMeshJobInfoList, String remoteResponse) {
CreateTaskResponse createTaskResponse = new CreateTaskResponse();
createTaskResponse.setTaskId(taskId);
List<CreateTaskRequest.JobDetail> jobDetailList = new ArrayList<>();
if (!eventMeshJobInfoList.isEmpty()) {
for (EventMeshJobInfo eventMeshJobInfo : eventMeshJobInfoList) {
CreateTaskRequest.JobDetail jobDetail = new CreateTaskRequest.JobDetail();
jobDetail.setJobId(eventMeshJobInfo.getJobID());
jobDetail.setRunningRegion(eventMeshJobInfo.getRunningRegion());
jobDetailList.add(jobDetail);
}
}
if (!StringUtils.isEmpty(remoteResponse)) {
Response response = JsonUtils.parseObject(remoteResponse, Response.class);
CreateTaskResponse remoteCreateTaskResponse = JsonUtils.convertValue(response.getData(), CreateTaskResponse.class);
jobDetailList.addAll(remoteCreateTaskResponse.getJobIdList());
}
createTaskResponse.setJobIdList(jobDetailList);
return createTaskResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class CreateTaskRequest {
@Data
public static class JobDetail {

private String jobId;

private String jobDesc;

// full/increase/check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,17 @@

package org.apache.eventmesh.common.remote.response;

import org.apache.eventmesh.common.remote.request.CreateTaskRequest;

import java.util.List;

import lombok.Data;

@Data
public class CreateTaskResponse extends BaseRemoteResponse {

private String taskId;

private List<CreateTaskRequest.JobDetail> jobIdList;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class JsonUtils {
OBJECT_MAPPER.registerModule(new JavaTimeModule());
}

public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
return OBJECT_MAPPER.convertValue(fromValue, toValueType);
}

public static <T> T mapToObject(Map<String, Object> map, Class<T> beanClass) {
if (map == null) {
return null;
Expand Down

0 comments on commit a32d930

Please sign in to comment.