Skip to content

Commit

Permalink
[INLONG-10436][Manager] Move schedule configuration from stream to gr…
Browse files Browse the repository at this point in the history
…oup (apache#10437)
  • Loading branch information
aloyszhang committed Jul 9, 2024
1 parent b9ca00b commit d297271
Show file tree
Hide file tree
Showing 24 changed files with 105 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
Expand Down Expand Up @@ -76,6 +77,13 @@ public ListenerResult listen(WorkflowContext context) throws Exception {
}

GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
// do not build sort config if the group mode is offline
if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
log.info("no need to launching sort job for groupId={} as the mode is offline",
groupId);
return ListenerResult.success();
}
List<InlongStreamInfo> streamInfos = groupResourceForm.getStreamInfos();
int sinkCount = streamInfos.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.inlong.manager.pojo.sort.util;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.manager.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.pojo.stream.StreamTransform;
Expand All @@ -41,8 +38,6 @@

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

/**
* Utils of stream parse.
Expand Down Expand Up @@ -158,21 +153,4 @@ public static StreamPipeline parseStreamPipeline(String tempView, String inlongS
String.format(" should not be null for streamId=%s", inlongStreamId));
return GSON.fromJson(tempView, StreamPipeline.class);
}

public static String getStreamExtProperty(String key, InlongStreamInfo streamInfo) {
if (StringUtils.isNotBlank(key) && streamInfo != null && CollectionUtils.isNotEmpty(streamInfo.getExtList())) {
for (InlongStreamExtInfo ext : streamInfo.getExtList()) {
if (key.equalsIgnoreCase(ext.getKeyName())) {
return ext.getKeyValue();
}
}
}
return null;
}

public static boolean isRegisterScheduleSuccess(InlongStreamInfo streamInfo) {
return InlongConstants.REGISTERED
.equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS, streamInfo));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule;
package org.apache.inlong.manager.schedule;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule;
package org.apache.inlong.manager.schedule;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule;
package org.apache.inlong.manager.schedule;

import lombok.Getter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule;
package org.apache.inlong.manager.schedule;

import lombok.Getter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.exception;
package org.apache.inlong.manager.schedule.exception;

/**
* Exceptions occur in the schedule procedure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.ScheduleEngineClient;
import org.apache.inlong.manager.schedule.ScheduleEngineClient;

/**
* Built-in implementation of schedule engine client corresponding with {@link QuartzScheduleEngine}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.ScheduleEngine;
import org.apache.inlong.schedule.exception.QuartzScheduleException;
import org.apache.inlong.manager.schedule.ScheduleEngine;
import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
Expand All @@ -35,8 +35,8 @@
import java.util.HashSet;
import java.util.Set;

import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail;
import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzJobDetail;
import static org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzTrigger;

/**
* The default implementation of schedule engine based on Quartz scheduler. Response for processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.quartz.JobDetail;
import org.quartz.JobKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.util;
package org.apache.inlong.manager.schedule.util;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.ScheduleType;
import org.apache.inlong.schedule.ScheduleUnit;
import org.apache.inlong.schedule.exception.QuartzScheduleException;
import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
import org.apache.inlong.manager.schedule.ScheduleType;
import org.apache.inlong.manager.schedule.ScheduleUnit;
import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;

import org.apache.commons.lang3.StringUtils;
import org.quartz.CronScheduleBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.inlong.schedule;
package org.apache.inlong.manager.schedule;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.exception.QuartzScheduleException;
import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;

import java.sql.Timestamp;

import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;

public class BaseScheduleTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.quartz;
package org.apache.inlong.manager.schedule.quartz;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.BaseScheduleTest;
import org.apache.inlong.manager.schedule.BaseScheduleTest;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -27,7 +27,7 @@

import java.util.concurrent.TimeUnit;

import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.inlong.schedule.util;
package org.apache.inlong.manager.schedule.util;

import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.schedule.BaseScheduleTest;
import org.apache.inlong.schedule.exception.QuartzScheduleException;
import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
import org.apache.inlong.manager.schedule.BaseScheduleTest;
import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;

import org.junit.jupiter.api.Test;
import org.quartz.CronScheduleBuilder;
Expand All @@ -35,13 +35,13 @@

import java.util.Date;

import static org.apache.inlong.schedule.ScheduleUnit.DAY;
import static org.apache.inlong.schedule.ScheduleUnit.HOUR;
import static org.apache.inlong.schedule.ScheduleUnit.MINUTE;
import static org.apache.inlong.schedule.ScheduleUnit.MONTH;
import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY;
import static org.apache.inlong.schedule.ScheduleUnit.WEEK;
import static org.apache.inlong.schedule.ScheduleUnit.YEAR;
import static org.apache.inlong.manager.schedule.ScheduleUnit.DAY;
import static org.apache.inlong.manager.schedule.ScheduleUnit.HOUR;
import static org.apache.inlong.manager.schedule.ScheduleUnit.MINUTE;
import static org.apache.inlong.manager.schedule.ScheduleUnit.MONTH;
import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_WAY;
import static org.apache.inlong.manager.schedule.ScheduleUnit.WEEK;
import static org.apache.inlong.manager.schedule.ScheduleUnit.YEAR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.manager.common.plugin.Plugin;
import org.apache.inlong.manager.common.plugin.PluginBinder;
import org.apache.inlong.manager.service.listener.queue.QueueResourceListener;
import org.apache.inlong.manager.service.listener.schedule.GroupScheduleResourceListener;
import org.apache.inlong.manager.service.listener.sink.SinkResourceListener;
import org.apache.inlong.manager.service.listener.sort.SortConfigListener;
import org.apache.inlong.manager.service.listener.source.SourceDeleteListener;
Expand All @@ -29,6 +30,7 @@
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
Expand Down Expand Up @@ -57,6 +59,7 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto
private List<SourceOperateListener> sourceOperateListeners;
private List<QueueOperateListener> queueOperateListeners;
private List<SortOperateListener> sortOperateListeners;
private List<ScheduleOperateListener> scheduleOperateListeners;

@Autowired
private SourceStopListener sourceStopListener;
Expand All @@ -70,6 +73,8 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto
private SinkResourceListener sinkResourceListener;
@Autowired
private SortConfigListener sortConfigListener;
@Autowired
private GroupScheduleResourceListener groupScheduleResourceListener;

@PostConstruct
public void init() {
Expand All @@ -81,6 +86,8 @@ public void init() {
queueOperateListeners.add(queueResourceListener);
sortOperateListeners = new LinkedList<>();
sortOperateListeners.add(sortConfigListener);
scheduleOperateListeners = new LinkedList<>();
scheduleOperateListeners.add(groupScheduleResourceListener);
}

@Override
Expand Down Expand Up @@ -124,6 +131,9 @@ public List<? extends TaskEventListener> get(WorkflowContext workflowContext, Se
return Lists.newArrayList(sourceOperateListeners);
case INIT_SINK:
return Collections.singletonList(sinkResourceListener);
case INIT_SCHEDULE:
List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext);
return Lists.newArrayList(scheduleOperateListeners);
default:
throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType));
}
Expand Down Expand Up @@ -177,4 +187,14 @@ public List<SortOperateListener> getSortOperateListener(WorkflowContext context)
return listeners;
}

public List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) {
List<ScheduleOperateListener> listeners = new ArrayList<>();
for (ScheduleOperateListener listener : scheduleOperateListeners) {
if (listener != null && listener.accept(context)) {
listeners.add(listener);
}
}
return listeners;
}

}
Loading

0 comments on commit d297271

Please sign in to comment.