Skip to content

Commit

Permalink
[INLONG-11103][Manager] Data add task supports filtering based on stream
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Sep 14, 2024
1 parent 807717a commit 5f82188
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import lombok.EqualsAndHashCode;
import lombok.ToString;

import java.util.List;

@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = SourceType.FILE)
@ApiModel(value = "File data add task request")
public class FileDataAddTaskRequest extends DataAddTaskRequest {

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

@ApiModelProperty("Start time")
private Long startTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class FileSource extends StreamSource {
@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public FileSource() {
this.setSourceType(SourceType.FILE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class FileSourceDTO {
@ApiModelProperty(value = "Audit version")
private String auditVersion;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest, String extParams) {
FileSourceDTO dto = StringUtils.isNotBlank(extParams)
? FileSourceDTO.getFromJson(extParams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class FileSourceRequest extends SourceRequest {
@ApiModelProperty("End time")
private Long endTime;

@ApiModelProperty("filterStreams")
private List<String> filterStreams;

public FileSourceRequest() {
this.setSourceType(SourceType.FILE);
this.setSerializationType(DataFormat.CSV.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,6 @@ default Boolean updateAfterApprove(String operator) {
* @param operator Operator's name.
* @return source id list after saving.
*/
List<Integer> batchAddDataAddTask(String groupId, String streamId, List<DataAddTaskRequest> requestList,
List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
}

@Override
public List<Integer> batchAddDataAddTask(String groupId, String streamId, List<DataAddTaskRequest> requestList,
public List<Integer> batchAddDataAddTask(String groupId, List<DataAddTaskRequest> requestList,
String operator) {
List<Integer> result = new ArrayList<>();
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, streamId));
String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null));
for (DataAddTaskRequest request : requestList) {
request.setAuditVersion(auditVersion);
int id = addDataAddTask(request, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,19 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) {
dto.setStartTime(sourceRequest.getStartTime());
dto.setEndTime(sourceRequest.getEndTime());
dto.setRetry(true);
dto.setAuditVersion(request.getAuditVersion());
dto.setFilterStreams(sourceRequest.getFilterStreams());
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
dataAddTaskEntity.setId(null);
dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1));
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
return sourceMapper.insert(dataAddTaskEntity);
Integer id = sourceMapper.insert(dataAddTaskEntity);
SourceRequest dataAddTaskRequest =
CommonBeanUtils.copyProperties(dataAddTaskEntity, SourceRequest::new, true);
updateAgentTaskConfig(dataAddTaskRequest, operator);
return id;
} catch (Exception e) {
LOGGER.error("serialize extParams of File SourceDTO failure: ", e);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,13 @@ public Response<Boolean> forceDelete(@RequestParam String inlongGroupId, @Reques
sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName()));
}

@RequestMapping(value = "/source/addDataAddTask/{groupId}/{streamId}", method = RequestMethod.POST)
@RequestMapping(value = "/source/addDataAddTask/{groupId}", method = RequestMethod.POST)
@ApiOperation(value = "Add supplementary recording task for stream source")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
})
public Response<List<Integer>> addSub(@PathVariable String groupId, @PathVariable String streamId,
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true)
public Response<List<Integer>> addSub(@PathVariable String groupId,
@RequestBody List<DataAddTaskRequest> requestList) {
return Response.success(sourceService.batchAddDataAddTask(groupId, streamId, requestList,
LoginUserUtils.getLoginUser().getName()));
return Response.success(
sourceService.batchAddDataAddTask(groupId, requestList, LoginUserUtils.getLoginUser().getName()));
}

}

0 comments on commit 5f82188

Please sign in to comment.