Skip to content

Commit

Permalink
[INLONG-11487][Manager] Support adding data add tasks based on the so…
Browse files Browse the repository at this point in the history
…urce ID
  • Loading branch information
fuweng11 committed Nov 12, 2024
1 parent eb55831 commit 936f095
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.hibernate.validator.constraints.Length;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

import java.util.List;

Expand All @@ -39,7 +40,7 @@ public class DataAddTaskRequest {
@NotBlank(message = "inlongGroupId cannot be blank")
private String groupId;

@ApiModelProperty(value = "Source ID", hidden = true)
@ApiModelProperty(value = "Source ID, When the source ID is not null, data add task entries based on the source ID")
private Integer sourceId;

@ApiModelProperty(value = "Agent ip List")
Expand All @@ -53,4 +54,14 @@ public class DataAddTaskRequest {
@ApiModelProperty(value = "Audit version", hidden = true)
private String auditVersion;

@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;

@ApiModelProperty(value = "Data end time")
private String dataTimeTo;

@ApiModelProperty(value = "Increase Audit Version")
@NotNull(message = "IncreaseAuditVersion cannot be null")
private Boolean increaseAuditVersion;

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,4 @@ public class FileDataAddTaskRequest extends DataAddTaskRequest {
@ApiModelProperty("filterStreams")
private List<String> filterStreams;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public class FileSource extends StreamSource {
@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;
@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;

@ApiModelProperty("End time")
private Long endTime;
@ApiModelProperty(value = "Data end time")
private String dataTimeTo;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public class FileSourceDTO {
@ApiModelProperty("Whether retry")
private Boolean retry = false;

@ApiModelProperty("Start time")
private Long startTime = 0L;
@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;

@ApiModelProperty("End time")
private Long endTime = 0L;
@ApiModelProperty(value = "Data end time")
private String dataTimeTo;

@ApiModelProperty(value = "Audit version")
private String auditVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ public class FileSourceRequest extends SourceRequest {
@ApiModelProperty("Whether retry")
private Boolean retry;

@ApiModelProperty("Start time")
private Long startTime;

@ApiModelProperty("End time")
private Long endTime;
@ApiModelProperty(value = "Data start time")
private String dataTimeFrom;

@ApiModelProperty(value = "Data end time")
private String dataTimeTo;
@ApiModelProperty("filterStreams")
private List<String> filterStreams;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstall

public static final String INSTALLER_CONF_PATH = "/conf/installer.properties";
public static final String INSTALLER_START_CMD = "/bin/installer.sh start";
public static final String CRONTAB_START_CMD = "/bin/crontab.sh";
public static final String INSTALLER_RESTART_CMD = "/bin/installer.sh restart";
public static final String INSTALLER_STOP_CMD = "/bin/installer.sh restart";
public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId";
Expand Down Expand Up @@ -115,6 +116,8 @@ public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) {
deployInstaller(request, operator);
String startCmd = agentInstallPath + INSTALLER_START_CMD;
commandExecutor.execRemote(request, startCmd);
String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
commandExecutor.execRemote(request, crontabStartCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALL_SUCCESS.getStatus(), currentTime + InlongConstants.BLANK + "success to install");
} catch (Exception e) {
Expand Down Expand Up @@ -149,6 +152,8 @@ public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator)
commandExecutor.cpDir(request, agentInstallTempPath + "/modules.json", agentInstallPath + "/conf");
String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
commandExecutor.execRemote(request, reStartCmd);
String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD;
commandExecutor.execRemote(request, crontabStartCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.NORMAL.getStatus(),
currentTime + InlongConstants.BLANK + "success to reinstall");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public CommandResult rmDir(AgentClusterNodeRequest clusterNodeRequest, String pa
@Override
public CommandResult cpDir(AgentClusterNodeRequest clusterNodeRequest, String sourcePath, String targetPath)
throws Exception {
return execRemote(clusterNodeRequest, "cp " + sourcePath + " " + targetPath);
return execRemote(clusterNodeRequest,
"if [ -e " + sourcePath + " ]; then cp " + sourcePath + " " + targetPath + "; fi");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,21 @@ public List<Integer> addDataAddTask(DataAddTaskRequest request, String operator)
List<String> agentIpList = request.getAgentIpList();
List<StreamSourceEntity> entityList = new ArrayList<>();
List<Integer> resultIdList = new ArrayList<>();
if (CollectionUtils.isEmpty(agentIpList)) {
entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null);
if (request.getSourceId() != null) {
StreamSourceEntity entity = sourceMapper.selectById(request.getSourceId());
Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
entityList.add(entity);
} else {
for (String agentIp : agentIpList) {
List<StreamSourceEntity> sourceEntityList = sourceMapper.selectByAgentIp(agentIp);
entityList.addAll(sourceEntityList);
if (agentIpList == null) {
throw new BusinessException("Agent ip list can not null");
}
if (CollectionUtils.isEmpty(agentIpList)) {
entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null);
} else {
for (String agentIp : agentIpList) {
List<StreamSourceEntity> sourceEntityList = sourceMapper.selectByAgentIp(agentIp);
entityList.addAll(sourceEntityList);
}
}
}
for (StreamSourceEntity sourceEntity : entityList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
try {
List<StreamSourceEntity> dataAddTaskList = sourceMapper.selectByTaskMapId(sourceEntity.getId());
FileSourceDTO dto = FileSourceDTO.getFromJson(sourceEntity.getExtParams());
dto.setStartTime(sourceRequest.getStartTime());
dto.setEndTime(sourceRequest.getEndTime());
dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
dto.setDataTimeTo(sourceRequest.getDataTimeTo());
dto.setRetry(true);
dto.setAuditVersion(request.getAuditVersion());
if (request.getIncreaseAuditVersion()) {
dto.setAuditVersion(request.getAuditVersion());
}
dto.setFilterStreams(sourceRequest.getFilterStreams());
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
Expand Down

0 comments on commit 936f095

Please sign in to comment.