From 064eca0ab1804af654599bb06eae2ad6a1e59ea5 Mon Sep 17 00:00:00 2001 From: gaohongtao Date: Thu, 29 Jun 2017 10:38:21 +0800 Subject: [PATCH] Fixed #367, fixed disable/enable job/app process --- .../job/CloudJobConfigurationListener.java | 2 +- .../cloud/scheduler/mesos/FacadeService.java | 14 ++++++++++++++ .../scheduler/producer/ProducerManager.java | 18 ++++++++++++------ .../producer/TransientProducerScheduler.java | 2 +- .../scheduler/restful/CloudAppRestfulApi.java | 5 +++++ .../scheduler/restful/CloudJobRestfulApi.java | 4 +++- .../job/CloudJobConfigurationListenerTest.java | 14 +++++++------- .../scheduler/mesos/FacadeServiceTest.java | 1 + .../restful/CloudJobRestfulApiTest.java | 2 +- 9 files changed, 45 insertions(+), 17 deletions(-) diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java index 0f9e58ae78..bccbb183a1 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListener.java @@ -70,7 +70,7 @@ public void childEvent(final CuratorFramework client, final TreeCacheEvent event if (!jobConfig.getTypeConfig().getCoreConfig().isMisfire()) { readyService.setMisfireDisabled(jobConfig.getJobName()); } - producerManager.reschedule(jobConfig); + producerManager.reschedule(jobConfig.getJobName()); } else if (isJobConfigNode(event, path, Type.NODE_REMOVED)) { String jobName = path.substring(CloudJobConfigurationNode.ROOT.length() + 1, path.length()); producerManager.unschedule(jobName); diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java index 527c58e1a9..d27c36c532 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeService.java @@ -172,12 +172,19 @@ public void recordFailoverTask(final TaskContext taskContext) { if (!jobConfigOptional.isPresent()) { return; } + if (isDisable(jobConfigOptional.get())) { + return; + } CloudJobConfiguration jobConfig = jobConfigOptional.get(); if (jobConfig.getTypeConfig().getCoreConfig().isFailover() || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) { failoverService.add(taskContext); } } + private boolean isDisable(final CloudJobConfiguration jobConfiguration) { + return disableAppService.isDisabled(jobConfiguration.getAppName()) || disableJobService.isDisabled(jobConfiguration.getJobName()); + } + /** * 将瞬时作业放入待执行队列. * @@ -223,6 +230,13 @@ public Optional getFailoverTaskId(final MetaInfo metaInfo) { * @param jobName 作业名称 */ public void addDaemonJobToReadyQueue(final String jobName) { + Optional jobConfigOptional = jobConfigService.load(jobName); + if (!jobConfigOptional.isPresent()) { + return; + } + if (isDisable(jobConfigOptional.get())) { + return; + } readyService.addDaemon(jobName); } diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java index 71d4c6d37b..e681e656e3 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/ProducerManager.java @@ -117,7 +117,7 @@ public void update(final CloudJobConfiguration jobConfig) { throw new JobConfigurationException("Cannot found job '%s', please register first.", jobConfig.getJobName()); } configService.update(jobConfig); - reschedule(jobConfig); + reschedule(jobConfig.getJobName()); } /** @@ -130,7 +130,6 @@ public void deregister(final String jobName) { if (jobConfig.isPresent()) { disableJobService.remove(jobName); configService.remove(jobName); - transientProducerScheduler.deregister(jobConfig.get()); } unschedule(jobName); } @@ -162,16 +161,23 @@ public void unschedule(final String jobName) { } runningService.remove(jobName); readyService.remove(Lists.newArrayList(jobName)); + Optional jobConfig = configService.load(jobName); + if (jobConfig.isPresent()) { + transientProducerScheduler.deregister(jobConfig.get()); + } } /** * 重新调度作业. * - * @param jobConfig 作业配置 + * @param jobName 作业名称 */ - public void reschedule(final CloudJobConfiguration jobConfig) { - unschedule(jobConfig.getJobName()); - schedule(jobConfig); + public void reschedule(final String jobName) { + unschedule(jobName); + Optional jobConfig = configService.load(jobName); + if (jobConfig.isPresent()) { + schedule(jobConfig.get()); + } } /** diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java index bfb793cc05..93d379f9d3 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/producer/TransientProducerScheduler.java @@ -112,7 +112,7 @@ private Trigger buildTrigger(final String cron) { return TriggerBuilder.newTrigger().withIdentity(cron).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build(); } - void deregister(final CloudJobConfiguration jobConfig) { + synchronized void deregister(final CloudJobConfiguration jobConfig) { repository.remove(jobConfig.getJobName()); String cron = jobConfig.getTypeConfig().getCoreConfig().getCron(); if (!repository.containsKey(buildJobKey(cron))) { diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java index 5c2c7f6a23..9d849510e5 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudAppRestfulApi.java @@ -186,6 +186,11 @@ public void disable(@PathParam("appName") final String appName) { public void enable(@PathParam("appName") final String appName) throws JSONException { if (appConfigService.load(appName).isPresent()) { disableAppService.remove(appName); + for (CloudJobConfiguration each : jobConfigService.loadAll()) { + if (appName.equals(each.getAppName())) { + producerManager.reschedule(each.getJobName()); + } + } } } diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java index 2ad20f6dbf..2505b04355 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/main/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApi.java @@ -184,8 +184,10 @@ public boolean isDisabled(@PathParam("jobName") final String jobName) throws JSO @DELETE @Path("/{jobName}/disable") public void enable(@PathParam("jobName") final String jobName) throws JSONException { - if (configService.load(jobName).isPresent()) { + Optional configOptional = configService.load(jobName); + if (configOptional.isPresent()) { facadeService.enableJob(jobName); + producerManager.reschedule(jobName); } } diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java index 99f7ae93ef..e8dc9213b5 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/config/job/CloudJobConfigurationListenerTest.java @@ -58,7 +58,7 @@ public void setUp() throws NoSuchFieldException { public void assertChildEventWhenDataIsNull() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null)); verify(producerManager, times(0)).schedule(ArgumentMatchers.any()); - verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); + verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.any()); } @@ -66,7 +66,7 @@ public void assertChildEventWhenDataIsNull() throws Exception { public void assertChildEventWhenIsNotConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/other/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.any()); - verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); + verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.any()); } @@ -74,7 +74,7 @@ public void assertChildEventWhenIsNotConfigPath() throws Exception { public void assertChildEventWhenIsRootConfigPath() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/config/job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.any()); - verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); + verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.any()); } @@ -82,7 +82,7 @@ public void assertChildEventWhenIsRootConfigPath() throws Exception { public void assertChildEventWhenStateIsAddAndIsConfigPathAndInvalidData() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData("/config/job/test_job", null, "".getBytes()))); verify(producerManager, times(0)).schedule(ArgumentMatchers.any()); - verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); + verify(producerManager, times(0)).reschedule(ArgumentMatchers.any()); verify(producerManager, times(0)).unschedule(ArgumentMatchers.any()); } @@ -96,7 +96,7 @@ public void assertChildEventWhenStateIsAddAndIsConfigPath() throws Exception { public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndTransientJob() throws Exception { cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson().getBytes()))); verify(readyService, times(0)).remove(Collections.singletonList("test_job")); - verify(producerManager).reschedule(ArgumentMatchers.any()); + verify(producerManager).reschedule(ArgumentMatchers.any()); } @Test @@ -104,7 +104,7 @@ public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndDaemonJob() throw cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(CloudJobExecutionType.DAEMON).getBytes()))); verify(readyService).remove(Collections.singletonList("test_job")); - verify(producerManager).reschedule(ArgumentMatchers.any()); + verify(producerManager).reschedule(ArgumentMatchers.any()); } @Test @@ -112,7 +112,7 @@ public void assertChildEventWhenStateIsUpdateAndIsConfigPathAndMisfireDisabled() cloudJobConfigurationListener.childEvent(null, new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData("/config/job/test_job", null, CloudJsonConstants.getJobJson(false).getBytes()))); verify(readyService).setMisfireDisabled("test_job"); - verify(producerManager).reschedule(ArgumentMatchers.any()); + verify(producerManager).reschedule(ArgumentMatchers.any()); } @Test diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java index 35e44ccded..50cc09c4fe 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/mesos/FacadeServiceTest.java @@ -222,6 +222,7 @@ public void assertLoadJobConfigWhenAbsent() { @Test public void assertAddDaemonJobToReadyQueue() { + when(jobConfigService.load("test_job")).thenReturn(Optional.of(CloudJobConfigurationBuilder.createCloudJobConfiguration("test_job"))); facadeService.addDaemonJobToReadyQueue("test_job"); verify(readyService).addDaemon("test_job"); } diff --git a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApiTest.java b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApiTest.java index e1c902886b..1956a5da2e 100644 --- a/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApiTest.java +++ b/elastic-job-cloud/elastic-job-cloud-scheduler/src/test/java/com/dangdang/ddframe/job/cloud/scheduler/restful/CloudJobRestfulApiTest.java @@ -106,7 +106,7 @@ public void assertUpdate() throws Exception { public void assertDeregister() throws Exception { when(getRegCenter().isExisted("/config/job/test_job")).thenReturn(false); assertThat(sentRequest("http://127.0.0.1:19000/api/job/deregister", "DELETE", "test_job"), is(204)); - verify(getRegCenter(), times(2)).get("/config/job/test_job"); + verify(getRegCenter(), times(3)).get("/config/job/test_job"); } @Test