Skip to content

Commit

Permalink
[INLONG-9781][Manager] Add offline sync task type definition (#9787)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed May 16, 2024
1 parent b9c9688 commit 84511f0
Show file tree
Hide file tree
Showing 15 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static InlongGroupInfo createGroupInfo() {
// set enable zk, create resource, group mode, and cluster tag
pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK);
pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE);
pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE);
pulsarInfo.setInlongClusterTag("default_cluster");

pulsarInfo.setDailyRecords(10000000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public class InlongConstants {
public static final Integer DELETED_STATUS = 10;

public static final Integer STANDARD_MODE = 0;
public static final Integer DATASYNC_MODE = 1;
public static final Integer DATASYNC_REALTIME_MODE = 1;
public static final Integer DATASYNC_OFFLINE_MODE = 2;

public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ public enum GroupMode {
STANDARD("standard"),

/**
* DataSync mode(only Data Synchronization): group init only with sort in InLong Cluster
* StreamSource -> Sort -> StreamSink
* DataSync mode(only Data Synchronization): real-time data sync in stream way, group init only with
* sort in InLong Cluster.
* StreamSource -> Sort -> Sink
*/
DATASYNC("datasync");
DATASYNC("datasync"),

/**
* DataSync mode(only Data Synchronization): offline data sync in batch way, group init only with sort
* in InLong Cluster.
* BatchSource -> Sort -> Sink
*/
DATASYNC_BATCH("datasync_offline");

@Getter
private final String mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean accept(WorkflowContext workflowContext) {
}

log.info("add startup group listener for groupId [{}]", groupId);
return InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
@ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable")
private Integer enableCreateResource;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Data report type, default is 0.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class InlongGroupPageRequest extends PageRequest {
@ApiModelProperty(value = "The inlong cluster tag list")
private List<String> clusterTagList;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Current user", hidden = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]")
private Integer enableCreateResource;

@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1")
@Range(min = 0, max = 1, message = "default is 0, only supports [0: Standard, 1: DataSync]")
@ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1,"
+ " DataSync mode(only Data Synchronization, offline data sync in batch way): 2")
@Range(min = 0, max = 2, message = "default is 0, only supports [0: Standard, 1: DataSync, 2: DataSyncOffline]")
private Integer inlongGroupMode;

@ApiModelProperty(value = "Data report type, default is 0.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {

if (CollectionUtils.isEmpty(request.getAuditIds())) {
// properly overwrite audit ids by role and stream config
if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) {
auditIdMap.put(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
} else {
Expand Down Expand Up @@ -320,7 +321,8 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN
} else {
auditSet.add(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS));
InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId);
if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) {
auditSet.add(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS));
} else {
auditSet.add(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public Boolean startTagSwitch(String groupId, String clusterTag) {
InlongGroupInfo groupInfo = this.get(groupId);

// check if the group mode is data sync mode
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
String errMSg = String.format("no need to switch sync mode group = {}", groupId);
LOGGER.error(errMSg);
throw new BusinessException(errMSg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

// update status of other related configs
if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public ListenerResult listen(WorkflowContext context) {
}

// if the inlong group is dataSync mode, the stream source needs to be processed.
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
changeSource4DataSync(groupId, operateType, operator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc

String operator = context.getOperator();
GroupOperateType operateType = groupProcessForm.getGroupOperateType();
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId);
if (GroupOperateType.INIT.equals(operateType)) {
this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
// Update status of other related configs
streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
streamService.updateWithoutCheck(streamInfo.genRequest(), operator);
if (InlongConstants.DATASYNC_MODE.equals(form.getGroupInfo().getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) {
sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void updateOpt(SourceRequest request, Integer groupStatus, Integer groupM
updateFieldOpt(entity, request.getFieldList());
return;
}
boolean allowUpdate = InlongConstants.DATASYNC_MODE.equals(groupMode)
boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
|| SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
if (!allowUpdate) {
throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,

// if the group mode is DATASYNC, just get all related stream sources
List<StreamSource> streamSources = this.listSource(groupId, null);
if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) {
result = streamSources.stream()
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
Collectors.toCollection(ArrayList::new)));
Expand Down

0 comments on commit 84511f0

Please sign in to comment.