Skip to content

Commit

Permalink
Refactor the worker... remove exception that is not used.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 30, 2016
1 parent 44c5a36 commit f2beb2b
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 79 deletions.
10 changes: 0 additions & 10 deletions src/Illuminate/Queue/TimeoutException.php

This file was deleted.

191 changes: 122 additions & 69 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,32 @@ public function __construct(QueueManager $manager,
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
$this->enableAsnycSignals();

$lastRestart = $this->getTimestampOfLastQueueRestart();

while (true) {
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

$this->registerTimeoutHandler($job, $options);

// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job && $this->daemonShouldRun($options)) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}

// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
if ($this->memoryExceeded($options->memory) ||
$this->queueShouldRestart($lastRestart)) {
$this->stop();
Expand All @@ -97,23 +108,29 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($options->timeout == 0 || version_compare(PHP_VERSION, '7.1.0') < 0 || ! extension_loaded('pcntl')) {
return;
if ($options->timeout > 0 && $this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});

pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep);
}
}

$timeout = $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;

pcntl_async_signals(true);

pcntl_signal(SIGALRM, function () {
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}

exit(1);
});

pcntl_alarm($timeout + $options->sleep);
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout())
? $job->timeout() : $options->timeout;
}

/**
Expand Down Expand Up @@ -164,19 +181,20 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
}

/**
* Process the given job.
* Get the next job from the queue connection.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
protected function getNextJob($connection, $queue)
{
try {
return $this->process(
$connectionName, $job, $options
);
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
Expand All @@ -185,20 +203,17 @@ protected function runJob($job, $connectionName, WorkerOptions $options)
}

/**
* Get the next job from the queue connection.
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Queue $connection
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function getNextJob($connection, $queue)
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
Expand All @@ -219,6 +234,9 @@ protected function getNextJob($connection, $queue)
public function process($connectionName, $job, WorkerOptions $options)
{
try {
// First we will raise the before job event and determine if the job has already ran
// over the its maximum attempt limit, which could primarily happen if the job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);

$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
Expand Down Expand Up @@ -253,18 +271,21 @@ public function process($connectionName, $job, WorkerOptions $options)
*/
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
try {
$this->markJobAsFailedIfHasExceededMaxAttempts(
// First, we will go ahead and mark the job as failed if it will exceed the maximum
// attempts it is allowed to run the next time we process it. If so we will just
// go ahead and mark it as failed now so we do not have to release this again.
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);

$this->raiseExceptionOccurredJobEvent(
$connectionName, $job, $e
);
} finally {
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted()) {
$job->release($options->delay);
}
Expand All @@ -291,11 +312,9 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
return;
}

$e = new MaxAttemptsExceededException(
$this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
'A queued job has been attempted too many times. The job may have previously timed out.'
);

$this->failJob($connectionName, $job, $e);
));

throw $e;
}
Expand All @@ -309,16 +328,13 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
* @param \Exception $e
* @return void
*/
protected function markJobAsFailedIfHasExceededMaxAttempts(
$connectionName, $job, $maxTries, $e
) {
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

if ($maxTries === 0 || $job->attempts() < $maxTries) {
return;
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connectionName, $job, $e);
}

$this->failJob($connectionName, $job, $e);
}

/**
Expand Down Expand Up @@ -392,6 +408,51 @@ protected function raiseFailedJobEvent($connectionName, $job, $e)
));
}

/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}

/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}

/**
* Enable async signals for the process.
*
* @return void
*/
protected function enableAsnycSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);
}
}

/**
* Determine if "async" signals are supported.
*
* @return bool
*/
protected function supportsAsyncSignals()
{
return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl');
}

/**
* Determine if the memory limit has been exceeded.
*
Expand All @@ -416,37 +477,29 @@ public function stop()
}

/**
* Sleep the script for a given number of seconds.
* Kill the process.
*
* @param int $seconds
* @param int $status
* @return void
*/
public function sleep($seconds)
public function kill($status = 0)
{
sleep($seconds);
}

/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}

exit($status);
}

/**
* Determine if the queue worker should restart.
* Sleep the script for a given number of seconds.
*
* @param int|null $lastRestart
* @return bool
* @param int $seconds
* @return void
*/
protected function queueShouldRestart($lastRestart)
public function sleep($seconds)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
sleep($seconds);
}

/**
Expand Down

0 comments on commit f2beb2b

Please sign in to comment.