Skip to content

Commit

Permalink
Added job deduplication code
Browse files Browse the repository at this point in the history
Former-commit-id: 4f3fcd1
  • Loading branch information
richcar58 committed Dec 9, 2016
1 parent 13ae44d commit 47d779a
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.iplantc.service.jobs.dao;

import java.math.BigInteger;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -117,6 +118,85 @@ public static int publish(JobPhaseType phase, String jobUuid, String creator)
return rows;
}

/* ---------------------------------------------------------------------- */
/* hasPublishedJob: */
/* ---------------------------------------------------------------------- */
/** Determine if a job exists in the published table in the specified phase.
*
* @param phase the phase in which the job is published
* @param jobUuid the job id
* @return true if the job exists in the phase, false otherwise
* @throws JobException on error
*/
public static boolean hasPublishedJob(JobPhaseType phase, String jobUuid)
throws JobException
{
// ------------------------- Check Input -------------------------
// Null/empty checks.
if (phase == null) {
String msg = "Null job phase received.";
_log.error(msg);
throw new JobException(msg);
}
if (StringUtils.isBlank(jobUuid)) {
String msg = "No job uuid specified.";
_log.error(msg);
throw new JobException(msg);
}

// ------------------------- Call SQL ----------------------------
boolean exists = true; // assume job in table
try
{
Session session = HibernateUtil.getSession();
session.clear();
HibernateUtil.beginTransaction();

// Create the insert command.
String sql = "select count(*) from job_published " +
"where phase = :phase " +
"and job_uuid = :job_uuid";

// Fill in the placeholders.
Query qry = session.createSQLQuery(sql);
qry.setString("phase", phase.name());
qry.setString("job_uuid", jobUuid);
qry.setCacheable(false);
qry.setCacheMode(CacheMode.IGNORE);

// Issue the call.
Object result = qry.uniqueResult();
if (result != null) {
int count = ((BigInteger)result).intValue();
if (count == 0) exists = false;
}
}
catch (Exception e)
{
// Rollback transaction.
try {HibernateUtil.rollbackTransaction();}
catch (Exception e1){_log.error("Rollback failed.", e1);}

String msg = "Unable to query existence of job published for phase " +
phase.name() + " and job " + jobUuid + ".";
_log.error(msg);
throw new JobException(msg, e);
}
finally {
try {HibernateUtil.commitTransaction();}
catch (Exception e)
{
String msg = "Unable to commit published job existence query for phase " +
phase.name() + " and job " + jobUuid + ".";
_log.error(msg);
throw new JobException(msg, e);
}
}

// Return the number of rows affected.
return exists;
}

/* ---------------------------------------------------------------------- */
/* getPublishedJobs: */
/* ---------------------------------------------------------------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ protected AbstractPhaseScheduler(JobPhaseType phaseType)
* phase scheduler republishes the job until the job's
* status changes.
*/
protected abstract boolean allowsRepublishing();
public abstract boolean allowsRepublishing();

/* ********************************************************************** */
/* Public Methods */
Expand Down Expand Up @@ -1178,9 +1178,10 @@ private void schedule()
toQueueableJSON(job).getBytes("UTF-8"));

// Let's not publish this job to a queue more than once in this phase.
// NOTE: A failure between the last statement and this statement may
// cause a job to run multiple times. We don't have any kind of
// distributed transaction manager coordinating the database and queues.
// NOTE: A failure between the last statement leaves information about this
// job in an inconsistent state. The worker thread that eventually
// services this job can resolve the situation. See
// AbstractPhaseWorker.detectDuplicateJob() for details.
if (!allowsRepublishing())
try {JobPublishedDao.publish(_phaseType, job.getUuid(), getSchedulerName());}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ protected List<JobStatusType> getPhaseTriggerStatuses()
/* allowsRepublishing: */
/* ---------------------------------------------------------------------- */
@Override
protected boolean allowsRepublishing(){return false;}
public boolean allowsRepublishing(){return false;}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ protected List<JobStatusType> getPhaseTriggerStatuses()
/* allowsRepublishing: */
/* ---------------------------------------------------------------------- */
@Override
protected boolean allowsRepublishing(){return true;}
public boolean allowsRepublishing(){return true;}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ protected List<JobStatusType> getPhaseTriggerStatuses()
/* allowsRepublishing: */
/* ---------------------------------------------------------------------- */
@Override
protected boolean allowsRepublishing(){return false;}
public boolean allowsRepublishing(){return false;}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ protected List<JobStatusType> getPhaseTriggerStatuses()
/* allowsRepublishing: */
/* ---------------------------------------------------------------------- */
@Override
protected boolean allowsRepublishing(){return false;}
public boolean allowsRepublishing(){return false;}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.iplantc.service.common.persistence.TenancyHelper;
import org.iplantc.service.jobs.Settings;
import org.iplantc.service.jobs.dao.JobDao;
import org.iplantc.service.jobs.dao.JobPublishedDao;
import org.iplantc.service.jobs.exceptions.JobException;
import org.iplantc.service.jobs.exceptions.JobFinishedException;
import org.iplantc.service.jobs.exceptions.JobWorkerException;
import org.iplantc.service.jobs.exceptions.QuotaViolationException;
import org.iplantc.service.jobs.managers.JobManager;
import org.iplantc.service.jobs.managers.JobQuotaCheck;
import org.iplantc.service.jobs.model.Job;
import org.iplantc.service.jobs.model.JobPublished;
import org.iplantc.service.jobs.model.enumerations.JobPhaseType;
import org.iplantc.service.jobs.model.enumerations.JobStatusType;
import org.iplantc.service.jobs.phases.JobInterruptUtils;
Expand Down Expand Up @@ -285,7 +287,6 @@ else if (command == JobCommand.NOOP) {
else {
// Reject this unreadable message so that
// it gets discarded or dead-lettered.
// TODO: Double check reject semantics
boolean requeue = false;
try {_jobChannel.basicReject(envelope.getDeliveryTag(), requeue);}
catch (IOException e) {
Expand Down Expand Up @@ -402,6 +403,16 @@ protected boolean doProcessJob(Envelope envelope,
TenancyHelper.setCurrentTenantId(job.getTenantId());
TenancyHelper.setCurrentEndUser(job.getOwner());

// Make sure we are not receiving a duplicate job request
// for phase they don't allow duplicate requests.
if (!_scheduler.allowsRepublishing())
if (detectDuplicateJob(job)) {
String msg = "Worker " + getName() + " detected duplicate job " +
job.getUuid() + " (" + job.getName() + ").";
_log.error(msg);
return false;
}

// Invoke the phase processor.
boolean jobProcessed = true;
try {processJob(job);}
Expand Down Expand Up @@ -686,6 +697,55 @@ protected void checkRetryPeriod(int days) throws JobWorkerException, JobExceptio
/* ********************************************************************** */
/* Private Methods */
/* ********************************************************************** */
/* ---------------------------------------------------------------------- */
/* detectDuplicateJob: */
/* ---------------------------------------------------------------------- */
/** Detect whether a duplicate job was received. Duplicate jobs can be
* received if the scheduler fails after publishing a job to a queue but
* before writing the job to the job_published table. In that case, the
* job may still be in a trigger state when a new or restarted scheduler
* begins processing. If the scheduler sees the job, it will republish
* it not knowing that it was previously published.
*
* This duplicate detection strategy mandates that worker to ignore jobs
* that are not tracked in the job_published table. This strategy works
* in all possible situations, each of which is analyzed here:
*
* 1. Nothing in the job record changed after the job was queued.
* - The job request will be ignored and the scheduler will eventually
* republish the job and insert the tracking record in the
* job_published table (unless another failure occurs).
* 2. The status of the job changes since it was queued.
* - Since the job is not yet being processed by any worker, the only
* status change possible is from an asynchronous request to stop,
* pause or delete the job. After the worker ignores the job request,
* the scheduler will not republish because the job is no longer
* in a trigger state. This is fine since the job is stopped.
*
* @param job the job whose duplicate status
* @return true if a duplicate was detected, false otherwise
* @throws JobException on error
*/
private boolean detectDuplicateJob(Job job)
{
// The default is to assume the worst since the job will eventually
// be requeued by the server if it's still in a trigger state.
boolean dup = true;
try {
// If the job is in the published table, its not a duplicate.
dup = !JobPublishedDao.hasPublishedJob(_scheduler.getPhaseType(),
job.getUuid());
}
catch (Exception e){
String msg = "Worker " + getName() + " unable to determine if job " +
job.getUuid() + " (" + job.getName() + ") was already published.";
_log.error(msg);
}

// False is the happy path result.
return dup;
}

/* ---------------------------------------------------------------------- */
/* initJobChannel: */
/* ---------------------------------------------------------------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,22 @@ public void interruptTest() throws JobException
"Unexpected number of published jobs for phase " +
JobPhaseType.STAGING.name());

// Check for an individual job record's existence.
boolean exists = JobPublishedDao.hasPublishedJob(list1.get(0).getPhase(),
list1.get(0).getJobUuid());
Assert.assertTrue(exists, "Expected job record's existence.");

// Delete a published job for the first phase.
int deleted = JobPublishedDao.deletePublishedJob(list1.get(0).getPhase(),
list1.get(0).getJobUuid());
Assert.assertEquals(deleted, 1,
"Unable to delete job: " + AgaveStringUtils.toString(list1.get(0)));

// Check for an individual job record's existence.
exists = JobPublishedDao.hasPublishedJob(list1.get(0).getPhase(),
list1.get(0).getJobUuid());
Assert.assertFalse(exists, "Expected job record's existence.");

// Retrieve all published job records for the first phase.
list1 = JobPublishedDao.getPublishedJobs(JobPhaseType.STAGING);
Assert.assertEquals(list1.size(), NUM_PUBLISHED - 1,
Expand All @@ -108,12 +118,22 @@ public void interruptTest() throws JobException
"Unexpected number of published jobs for phase " +
JobPhaseType.ARCHIVING.name());

// Check for an individual job record's existence.
exists = JobPublishedDao.hasPublishedJob(list2.get(0).getPhase(),
list2.get(0).getJobUuid());
Assert.assertTrue(exists, "Expected job record's existence.");

// Delete a published job for the second phase.
deleted = JobPublishedDao.deletePublishedJob(list2.get(0).getPhase(),
list2.get(0).getJobUuid());
Assert.assertEquals(deleted, 1,
"Unable to delete job: " + AgaveStringUtils.toString(list2.get(0)));

// Check for an individual job record's existence.
exists = JobPublishedDao.hasPublishedJob(list2.get(0).getPhase(),
list2.get(0).getJobUuid());
Assert.assertFalse(exists, "Expected job record's existence.");

// Retrieve all published job records for the second phase.
list2 = JobPublishedDao.getPublishedJobs(JobPhaseType.ARCHIVING);
Assert.assertEquals(list2.size(), NUM_PUBLISHED - 1,
Expand Down

0 comments on commit 47d779a

Please sign in to comment.