Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #367 #368

Merged
merged 3 commits into from
Jun 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void childEvent(final CuratorFramework client, final TreeCacheEvent event
if (!jobConfig.getTypeConfig().getCoreConfig().isMisfire()) {
readyService.setMisfireDisabled(jobConfig.getJobName());
}
producerManager.reschedule(jobConfig);
producerManager.reschedule(jobConfig.getJobName());
} else if (isJobConfigNode(event, path, Type.NODE_REMOVED)) {
String jobName = path.substring(CloudJobConfigurationNode.ROOT.length() + 1, path.length());
producerManager.unschedule(jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,19 @@ public void recordFailoverTask(final TaskContext taskContext) {
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
CloudJobConfiguration jobConfig = jobConfigOptional.get();
if (jobConfig.getTypeConfig().getCoreConfig().isFailover() || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
failoverService.add(taskContext);
}
}

private boolean isDisable(final CloudJobConfiguration jobConfiguration) {
return disableAppService.isDisabled(jobConfiguration.getAppName()) || disableJobService.isDisabled(jobConfiguration.getJobName());
}

/**
* 将瞬时作业放入待执行队列.
*
Expand Down Expand Up @@ -223,6 +230,13 @@ public Optional<String> getFailoverTaskId(final MetaInfo metaInfo) {
* @param jobName 作业名称
*/
public void addDaemonJobToReadyQueue(final String jobName) {
Optional<CloudJobConfiguration> jobConfigOptional = jobConfigService.load(jobName);
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
readyService.addDaemon(jobName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void update(final CloudJobConfiguration jobConfig) {
throw new JobConfigurationException("Cannot found job '%s', please register first.", jobConfig.getJobName());
}
configService.update(jobConfig);
reschedule(jobConfig);
reschedule(jobConfig.getJobName());
}

/**
Expand All @@ -130,7 +130,6 @@ public void deregister(final String jobName) {
if (jobConfig.isPresent()) {
disableJobService.remove(jobName);
configService.remove(jobName);
transientProducerScheduler.deregister(jobConfig.get());
}
unschedule(jobName);
}
Expand Down Expand Up @@ -162,16 +161,23 @@ public void unschedule(final String jobName) {
}
runningService.remove(jobName);
readyService.remove(Lists.newArrayList(jobName));
Optional<CloudJobConfiguration> jobConfig = configService.load(jobName);
if (jobConfig.isPresent()) {
transientProducerScheduler.deregister(jobConfig.get());
}
}

/**
* 重新调度作业.
*
* @param jobConfig 作业配置
* @param jobName 作业名称
*/
public void reschedule(final CloudJobConfiguration jobConfig) {
unschedule(jobConfig.getJobName());
schedule(jobConfig);
public void reschedule(final String jobName) {
unschedule(jobName);
Optional<CloudJobConfiguration> jobConfig = configService.load(jobName);
if (jobConfig.isPresent()) {
schedule(jobConfig.get());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private Trigger buildTrigger(final String cron) {
return TriggerBuilder.newTrigger().withIdentity(cron).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build();
}

void deregister(final CloudJobConfiguration jobConfig) {
synchronized void deregister(final CloudJobConfiguration jobConfig) {
repository.remove(jobConfig.getJobName());
String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
if (!repository.containsKey(buildJobKey(cron))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ public void disable(@PathParam("appName") final String appName) {
public void enable(@PathParam("appName") final String appName) throws JSONException {
if (appConfigService.load(appName).isPresent()) {
disableAppService.remove(appName);
for (CloudJobConfiguration each : jobConfigService.loadAll()) {
if (appName.equals(each.getAppName())) {
producerManager.reschedule(each.getJobName());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ public boolean isDisabled(@PathParam("jobName") final String jobName) throws JSO
@DELETE
@Path("/{jobName}/disable")
public void enable(@PathParam("jobName") final String jobName) throws JSONException {
if (configService.load(jobName).isPresent()) {
Optional<CloudJobConfiguration> configOptional = configService.load(jobName);
if (configOptional.isPresent()) {
facadeService.enableJob(jobName);
producerManager.reschedule(jobName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,31 @@ public void setUp() throws NoSuchFieldException {
public void assertChildEventWhenDataIsNull() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenIsNotConfigPath() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenIsRootConfigPath() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes())));
verify(producerManager, times(0)).schedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager, times(0)).reschedule(ArgumentMatchers.<String>any());
verify(producerManager, times(0)).unschedule(ArgumentMatchers.<String>any());
}

Expand All @@ -96,23 +96,23 @@ public void assertChildEventWhenStateIsAddAndIsConfigPath() throws Exception {
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndTransientJob() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes())));
verify(readyService, times(0)).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes())));
verify(readyService).remove(Collections.singletonList("test_job"));
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() throws Exception {
cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED,
new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes())));
verify(readyService).setMisfireDisabled("test_job");
verify(producerManager).reschedule(ArgumentMatchers.<CloudJobConfiguration>any());
verify(producerManager).reschedule(ArgumentMatchers.<String>any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public void assertLoadJobConfigWhenAbsent() {

@Test
public void assertAddDaemonJobToReadyQueue() {
when(jobConfigService.load("test_job")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job")));
facadeService.addDaemonJobToReadyQueue("test_job");
verify(readyService).addDaemon("test_job");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void assertUpdate() throws Exception {
public void assertDeregister() throws Exception {
when(getRegCenter().isExisted("/config/job/test_job")).thenReturn(false);
assertThat(sentRequest("http://127.0.0.1:19000/api/job/deregister", "DELETE", "test_job"), is(204));
verify(getRegCenter(), times(2)).get("/config/job/test_job");
verify(getRegCenter(), times(3)).get("/config/job/test_job");
}

@Test
Expand Down