From 1afb69516402c7079f833b377be1ee7a3b773c5f Mon Sep 17 00:00:00 2001 From: Eugene Goroschenya Date: Tue, 13 Dec 2016 20:04:23 +0300 Subject: [PATCH] issue #93 - Fix the jobs recovering (on scheduler startup) blocks simple trigger after failover situation for job which was executing during JVM crash --- .../impl/jdbcjobstore/JobStoreSupport.java | 2 +- .../java/org/quartz/core/RecoverJobsTest.java | 133 ++++++++++++++++++ .../org/quartz/core/RecoverJobsTestJob.java | 48 +++++++ 3 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 quartz-core/src/test/java/org/quartz/core/RecoverJobsTest.java create mode 100644 quartz-core/src/test/java/org/quartz/core/RecoverJobsTestJob.java diff --git a/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java b/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java index 8a1b3c763..1031762db 100644 --- a/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java +++ b/quartz-core/src/main/java/org/quartz/impl/jdbcjobstore/JobStoreSupport.java @@ -1035,7 +1035,7 @@ private void doUpdateOfMisfiredTrigger(Connection conn, OperableTrigger trig, bo schedSignaler.notifySchedulerListenersFinalized(trig); } else { storeTrigger(conn, trig, null, true, newStateIfNotComplete, - forceState, false); + forceState, recovering); } } diff --git a/quartz-core/src/test/java/org/quartz/core/RecoverJobsTest.java b/quartz-core/src/test/java/org/quartz/core/RecoverJobsTest.java new file mode 100644 index 000000000..fadc3ecdb --- /dev/null +++ b/quartz-core/src/test/java/org/quartz/core/RecoverJobsTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2013 Terracotta, Inc.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.quartz.core; + +import org.junit.Assert; +import org.junit.Test; +import org.quartz.JobBuilder; +import org.quartz.JobExecutionContext; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.TriggerBuilder; +import org.quartz.impl.DirectSchedulerFactory; +import org.quartz.impl.jdbcjobstore.JdbcQuartzTestUtilities; +import org.quartz.impl.jdbcjobstore.JobStoreTX; + +import org.quartz.listeners.JobListenerSupport; +import org.quartz.simpl.SimpleThreadPool; +import org.quartz.utils.DBConnectionManager; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author https://github.com/eugene-goroschenya + */ +public class RecoverJobsTest { + + @Test + public void testRecoveringRepeatJobWhichIsFiredAndMisfiredAtTheSameTime() throws SchedulerException, SQLException, InterruptedException { + String dsName = "recoverJobsTest"; + JdbcQuartzTestUtilities.createDatabase(dsName); + try { + final JobStoreTX jobStore = new JobStoreTX(); + jobStore.setDataSource(dsName); + jobStore.setInstanceId("SINGLE_NODE_TEST"); + jobStore.setInstanceName(dsName); + jobStore.setMisfireThreshold(1000); + + DirectSchedulerFactory factory = DirectSchedulerFactory.getInstance(); + + factory.createScheduler(new SimpleThreadPool(1, Thread.NORM_PRIORITY), jobStore); + Scheduler scheduler = factory.getScheduler(); + + // run forever up to the first fail over situation + RecoverJobsTestJob.runForever = true; + + scheduler.scheduleJob( + JobBuilder.newJob(RecoverJobsTestJob.class) + .withIdentity("test") + .build(), + TriggerBuilder.newTrigger() + .withIdentity("test") + .withSchedule( + SimpleScheduleBuilder.simpleSchedule() + .withIntervalInMilliseconds(1000) + .repeatForever() + ).build() + ); + + scheduler.start(); + + // wait to be sure job is executing + Thread.sleep(2000); + + // emulate fail over situation + scheduler.shutdown(false); + + Statement st = DBConnectionManager.getInstance().getConnection(dsName).createStatement(); + try { + ResultSet rs1 = st.executeQuery("SELECT TRIGGER_STATE from QRTZ_TRIGGERS"); + rs1.next(); + // check that trigger is blocked after fail over situation + Assert.assertEquals("BLOCKED", rs1.getString(1)); + + ResultSet rs2 = st.executeQuery("SELECT count(*) from QRTZ_FIRED_TRIGGERS"); + rs2.next(); + // check that fired trigger remains after fail over situation + Assert.assertEquals(1, rs2.getLong(1)); + } finally { + st.close(); + } + + // stop job executing to not as part of emulation fail over situation + RecoverJobsTestJob.runForever = false; + + // emulate down time >> trigger interval - misfireThreshold + Thread.sleep(4000); + + final AtomicBoolean isJobRecovered = new AtomicBoolean(false); + factory.createScheduler(new SimpleThreadPool(1, Thread.NORM_PRIORITY), jobStore); + Scheduler recovery = factory.getScheduler(); + recovery.getListenerManager().addJobListener(new JobListenerSupport() { + @Override + public String getName() { + return RecoverJobsTest.class.getSimpleName(); + } + + @Override + public void jobToBeExecuted(JobExecutionContext context) { + isJobRecovered.set(true); + } + }); + recovery.start(); + + // wait to be sure recovered job was executed + Thread.sleep(2000); + + // wait job + recovery.shutdown(true); + + Assert.assertTrue(isJobRecovered.get()); + } finally { + JdbcQuartzTestUtilities.destroyDatabase(dsName); + } + } + +} diff --git a/quartz-core/src/test/java/org/quartz/core/RecoverJobsTestJob.java b/quartz-core/src/test/java/org/quartz/core/RecoverJobsTestJob.java new file mode 100644 index 000000000..1f7a2cb4d --- /dev/null +++ b/quartz-core/src/test/java/org/quartz/core/RecoverJobsTestJob.java @@ -0,0 +1,48 @@ +/* + * Copyright 2013 Terracotta, Inc.. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.quartz.core; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author https://github.com/eugene-goroschenya + */ +@DisallowConcurrentExecution +public class RecoverJobsTestJob implements Job { + private static Logger _log = LoggerFactory.getLogger(RecoverJobsTestJob.class); + static boolean runForever = true; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + long now = System.currentTimeMillis(); + int tic = 0; + _log.info("Started - " + now); + try { + while (runForever) { + Thread.sleep(1000); + _log.info("Tic " + (++tic) + "- " + now); + } + _log.info("Stopped - " + now); + } catch (InterruptedException e) { + _log.info("Interrupted - " + now); + } + } +}