From f2beb2bbce11433283ba52744dbe7134aa55cbfa Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Fri, 30 Dec 2016 14:21:25 -0600 Subject: [PATCH] Refactor the worker... remove exception that is not used. --- src/Illuminate/Queue/TimeoutException.php | 10 -- src/Illuminate/Queue/Worker.php | 191 ++++++++++++++-------- 2 files changed, 122 insertions(+), 79 deletions(-) delete mode 100644 src/Illuminate/Queue/TimeoutException.php diff --git a/src/Illuminate/Queue/TimeoutException.php b/src/Illuminate/Queue/TimeoutException.php deleted file mode 100644 index 1fe5e1e57985..000000000000 --- a/src/Illuminate/Queue/TimeoutException.php +++ /dev/null @@ -1,10 +0,0 @@ -enableAsnycSignals(); + $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { + // First, we will attempt to get the next job off of the queue. We will also + // register the timeout handler and reset the alarm for this job so it is + // not stuck in a frozen state forever. Then, we can fire off this job. $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); $this->registerTimeoutHandler($job, $options); + // If the daemon should run (not in maintenance mode, etc.), then we can run + // fire off this job for processing. Otherwise, we will need to sleep the + // worker so no more jobs are processed until they should be processed. if ($job && $this->daemonShouldRun($options)) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. if ($this->memoryExceeded($options->memory) || $this->queueShouldRestart($lastRestart)) { $this->stop(); @@ -97,23 +108,29 @@ public function daemon($connectionName, $queue, WorkerOptions $options) */ protected function registerTimeoutHandler($job, WorkerOptions $options) { - if ($options->timeout == 0 || version_compare(PHP_VERSION, '7.1.0') < 0 || ! extension_loaded('pcntl')) { - return; + if ($options->timeout > 0 && $this->supportsAsyncSignals()) { + // We will register a signal handler for the alarm signal so that we can kill this + // process if it is running too long because it has frozen. This uses the async + // signals supported in recent versions of PHP to accomplish it conveniently. + pcntl_signal(SIGALRM, function () { + $this->kill(1); + }); + + pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep); } + } - $timeout = $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; - - pcntl_async_signals(true); - - pcntl_signal(SIGALRM, function () { - if (extension_loaded('posix')) { - posix_kill(getmypid(), SIGKILL); - } - - exit(1); - }); - - pcntl_alarm($timeout + $options->sleep); + /** + * Get the appropriate timeout for the given job. + * + * @param \Illuminate\Contracts\Queue\Job|null $job + * @param WorkerOptions $options + * @return int + */ + protected function timeoutForJob($job, WorkerOptions $options) + { + return $job && ! is_null($job->timeout()) + ? $job->timeout() : $options->timeout; } /** @@ -164,19 +181,20 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options) } /** - * Process the given job. + * Get the next job from the queue connection. * - * @param \Illuminate\Contracts\Queue\Job $job - * @param string $connectionName - * @param \Illuminate\Queue\WorkerOptions $options - * @return void + * @param \Illuminate\Contracts\Queue\Queue $connection + * @param string $queue + * @return \Illuminate\Contracts\Queue\Job|null */ - protected function runJob($job, $connectionName, WorkerOptions $options) + protected function getNextJob($connection, $queue) { try { - return $this->process( - $connectionName, $job, $options - ); + foreach (explode(',', $queue) as $queue) { + if (! is_null($job = $connection->pop($queue))) { + return $job; + } + } } catch (Exception $e) { $this->exceptions->report($e); } catch (Throwable $e) { @@ -185,20 +203,17 @@ protected function runJob($job, $connectionName, WorkerOptions $options) } /** - * Get the next job from the queue connection. + * Process the given job. * - * @param \Illuminate\Contracts\Queue\Queue $connection - * @param string $queue - * @return \Illuminate\Contracts\Queue\Job|null + * @param \Illuminate\Contracts\Queue\Job $job + * @param string $connectionName + * @param \Illuminate\Queue\WorkerOptions $options + * @return void */ - protected function getNextJob($connection, $queue) + protected function runJob($job, $connectionName, WorkerOptions $options) { try { - foreach (explode(',', $queue) as $queue) { - if (! is_null($job = $connection->pop($queue))) { - return $job; - } - } + return $this->process($connectionName, $job, $options); } catch (Exception $e) { $this->exceptions->report($e); } catch (Throwable $e) { @@ -219,6 +234,9 @@ protected function getNextJob($connection, $queue) public function process($connectionName, $job, WorkerOptions $options) { try { + // First we will raise the before job event and determine if the job has already ran + // over the its maximum attempt limit, which could primarily happen if the job is + // continually timing out and not actually throwing any exceptions from itself. $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( @@ -253,11 +271,11 @@ public function process($connectionName, $job, WorkerOptions $options) */ protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) { - // If we catch an exception, we will attempt to release the job back onto the queue - // so it is not lost entirely. This'll let the job be retried at a later time by - // another listener (or this same one). We will re-throw this exception after. try { - $this->markJobAsFailedIfHasExceededMaxAttempts( + // First, we will go ahead and mark the job as failed if it will exceed the maximum + // attempts it is allowed to run the next time we process it. If so we will just + // go ahead and mark it as failed now so we do not have to release this again. + $this->markJobAsFailedIfWillExceedMaxAttempts( $connectionName, $job, (int) $options->maxTries, $e ); @@ -265,6 +283,9 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti $connectionName, $job, $e ); } finally { + // If we catch an exception, we will attempt to release the job back onto the queue + // so it is not lost entirely. This'll let the job be retried at a later time by + // another listener (or this same one). We will re-throw this exception after. if (! $job->isDeleted()) { $job->release($options->delay); } @@ -291,11 +312,9 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $ return; } - $e = new MaxAttemptsExceededException( + $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( 'A queued job has been attempted too many times. The job may have previously timed out.' - ); - - $this->failJob($connectionName, $job, $e); + )); throw $e; } @@ -309,16 +328,13 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $ * @param \Exception $e * @return void */ - protected function markJobAsFailedIfHasExceededMaxAttempts( - $connectionName, $job, $maxTries, $e - ) { + protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) + { $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; - if ($maxTries === 0 || $job->attempts() < $maxTries) { - return; + if ($maxTries > 0 && $job->attempts() >= $maxTries) { + $this->failJob($connectionName, $job, $e); } - - $this->failJob($connectionName, $job, $e); } /** @@ -392,6 +408,51 @@ protected function raiseFailedJobEvent($connectionName, $job, $e) )); } + /** + * Determine if the queue worker should restart. + * + * @param int|null $lastRestart + * @return bool + */ + protected function queueShouldRestart($lastRestart) + { + return $this->getTimestampOfLastQueueRestart() != $lastRestart; + } + + /** + * Get the last queue restart timestamp, or null. + * + * @return int|null + */ + protected function getTimestampOfLastQueueRestart() + { + if ($this->cache) { + return $this->cache->get('illuminate:queue:restart'); + } + } + + /** + * Enable async signals for the process. + * + * @return void + */ + protected function enableAsnycSignals() + { + if ($this->supportsAsyncSignals()) { + pcntl_async_signals(true); + } + } + + /** + * Determine if "async" signals are supported. + * + * @return bool + */ + protected function supportsAsyncSignals() + { + return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl'); + } + /** * Determine if the memory limit has been exceeded. * @@ -416,37 +477,29 @@ public function stop() } /** - * Sleep the script for a given number of seconds. + * Kill the process. * - * @param int $seconds + * @param int $status * @return void */ - public function sleep($seconds) + public function kill($status = 0) { - sleep($seconds); - } - - /** - * Get the last queue restart timestamp, or null. - * - * @return int|null - */ - protected function getTimestampOfLastQueueRestart() - { - if ($this->cache) { - return $this->cache->get('illuminate:queue:restart'); + if (extension_loaded('posix')) { + posix_kill(getmypid(), SIGKILL); } + + exit($status); } /** - * Determine if the queue worker should restart. + * Sleep the script for a given number of seconds. * - * @param int|null $lastRestart - * @return bool + * @param int $seconds + * @return void */ - protected function queueShouldRestart($lastRestart) + public function sleep($seconds) { - return $this->getTimestampOfLastQueueRestart() != $lastRestart; + sleep($seconds); } /**