Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9781][Manager] Add offline sync task type definition #9787

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static InlongGroupInfo createGroupInfo() {
// set enable zk, create resource, group mode, and cluster tag
pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK);
pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE);
pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE);
pulsarInfo.setInlongClusterTag("default_cluster");

pulsarInfo.setDailyRecords(10000000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public class InlongConstants {
public static final Integer DELETED_STATUS = 10;

public static final Integer STANDARD_MODE = 0;
public static final Integer DATASYNC_MODE = 1;
public static final Integer DATASYNC_REALTIME_MODE = 1;
public static final Integer DATASYNC_OFFLINE_MODE = 2;

public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ public enum GroupMode {
STANDARD("standard"),

/**
* DataSync mode(only Data Synchronization): group init only with sort in InLong Cluster
* StreamSource -> Sort -> StreamSink
* DataSync mode(only Data Synchronization): real-time data sync in stream way, group init only with
* sort in InLong Cluster.
* StreamSource -> Sort -> Sink
*/
DATASYNC("datasync");
DATASYNC("datasync"),

/**
* DataSync mode(only Data Synchronization): offline data sync in batch way, group init only with sort
* in InLong Cluster.
* BatchSource -> Sort -> Sink
*/
DATASYNC_BATCH("datasync_offline");

@Getter
private final String mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean accept(WorkflowContext workflowContext) {
}

log.info("add startup group listener for groupId [{}]", groupId);
return InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
@ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
private Integer enableCreateResource;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Data report type, default is 0.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class InlongGroupPageRequest extends PageRequest {
@ApiModelProperty(value = "The inlong cluster tag list")
private List<String> clusterTagList;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Current user", hidden = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]")
private Integer enableCreateResource;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@Range(min = 0, max = 1, message = "default is 0, only supports [0: Standard, 1: DataSync]")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
@Range(min = 0, max = 2, message = "default is 0, only supports [0: Standard, 1: DataSync, 2: DataSyncOffline]")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Data report type, default is 0.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {

if (CollectionUtils.isEmpty(request.getAuditIds())) {
// properly overwrite audit ids by role and stream config
if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())) {
auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType);
request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
} else {
Expand Down Expand Up @@ -436,7 +436,7 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN
} else {
auditSet.add(getAuditId(sinkNodeType, true));
InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId);
if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())) {
auditSet.add(getAuditId(sourceNodeType, false));
} else {
auditSet.add(getAuditId(sinkNodeType, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public Boolean startTagSwitch(String groupId, String clusterTag) {
InlongGroupInfo groupInfo = this.get(groupId);

// check if the group mode is data sync mode
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
String errMSg = String.format("no need to switch sync mode group = {}", groupId);
LOGGER.error(errMSg);
throw new BusinessException(errMSg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

// update status of other related configs
if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ListenerResult listen(WorkflowContext context) {
}

// if the inlong group is dataSync mode, the stream source needs to be processed.
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
changeSource4DataSync(groupId, operateType, operator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

String operator = context.getOperator();
GroupOperateType operateType = groupProcessForm.getGroupOperateType();
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId);
if (GroupOperateType.INIT.equals(operateType)) {
this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
// Update status of other related configs
streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
streamService.updateWithoutCheck(streamInfo.genRequest(), operator);
if (InlongConstants.DATASYNC_MODE.equals(form.getGroupInfo().getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) {
sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM
updateFieldOpt(entity, request.getFieldList());
return;
}
boolean allowUpdate = InlongConstants.DATASYNC_MODE.equals(groupMode)
boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
|| SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
if (!allowUpdate) {
throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,

// if the group mode is DATASYNC, just get all related stream sources
List<StreamSource> streamSources = this.listSource(groupId, null);
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
result = streamSources.stream()
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
Collectors.toCollection(ArrayList::new)));
Expand Down
Loading