From c06eaf155ae355961a1740df8098e501745f43f4 Mon Sep 17 00:00:00 2001 From: Justin Fossey Date: Thu, 30 Jul 2020 11:30:23 +0200 Subject: [PATCH] Version 2, created dedicated worker sub-classes and add support for more Laravel versions (#10) * Add copied dedicated worker class and update batch runner to use it * Update worker and batch runner to use the same worker options class * Update comments and fix class compatibility for stop if necessary method * Fix sytax error and work options class compatability * Update application container when building worker to support maintenance mode * Update PHP and Laravel verion numbers in composer to be more accurate * Minor doc comment update * Switch to strict types being enabled for all PHP files * Fix to use batch worker interface for bindings * Fix interface name * Minor white space change * Fix another spelling of our new batch worker interface * Copy laravel work command * Fix batch command constructor differences * Fix worker options constructor * Fix missing stop when empty param for worker options * Bug fix and attempt to workaround pcre.jit issues * Clean up laravel classes and copy required Looping class * Added workoptions laravel copy and updated and fixed use paths * Update incorrect usage for batch options vs work options * Fix work option constructor params and construction * Fix syntax error * Change to WorkOptions type hinting by default * Fix using laravel workoptions class when it should be ours * Fix batch command name * Minor formatting changes * Remove not required command constructor that should just inherit its constructor * Fix casting and return types * Update app spawn to verion 1.5 * Minor test return type update * Remove void keyword for older php 7 version support * Update and adapt prcessing and flow to new structure and get tests to pass * add function exist check for pcntl extension package * Update larvel version to be 5.5 to 7.0 in composer and CI * Optimize PHP version tests * Update README PHP and Laravel version support --- .travis.yml | 22 +- README.md | 12 +- composer.json | 6 +- phpunit.xml | 1 - src/BatchCommand.php | 45 +- src/BatchOptions.php | 46 +- src/BatchRunner.php | 146 ---- src/BatchWorker.php | 206 +++++ .../QueueButtlerBatchWorkerInterface.php | 10 + src/Exceptions/StopBatch.php | 2 + src/Laravel/Looping.php | 42 + src/Laravel/README.md | 14 + src/Laravel/WorkCommand.php | 245 ++++++ src/Laravel/Worker.php | 756 ++++++++++++++++++ src/Laravel/WorkerOptions.php | 88 ++ src/QueueButlerServiceProvider.php | 25 +- tests/Jobs/QueueBatchTestJob.php | 2 + tests/QueueBatchTest.php | 22 +- .../2017_09_13_140144_create_jobs_table.php | 2 + 19 files changed, 1471 insertions(+), 221 deletions(-) delete mode 100644 src/BatchRunner.php create mode 100644 src/BatchWorker.php create mode 100644 src/Contracts/QueueButtlerBatchWorkerInterface.php create mode 100644 src/Laravel/Looping.php create mode 100644 src/Laravel/README.md create mode 100644 src/Laravel/WorkCommand.php create mode 100644 src/Laravel/Worker.php create mode 100644 src/Laravel/WorkerOptions.php diff --git a/.travis.yml b/.travis.yml index f2d6178..22d4a89 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,24 +12,18 @@ cache: matrix: include: - # - php: 5.6 - # env: ILLUMINATE_VERSION=5.3.* - # - php: 5.6 - # env: ILLUMINATE_VERSION=5.4.* - - php: 7.0 - env: ILLUMINATE_VERSION=5.3.* - - php: 7.0 - env: ILLUMINATE_VERSION=5.4.* - php: 7.0 env: ILLUMINATE_VERSION=5.5.* - php: 7.1 - env: ILLUMINATE_VERSION=5.3.* - - php: 7.1 - env: ILLUMINATE_VERSION=5.4.* - - php: 7.1 - env: ILLUMINATE_VERSION=5.5.* - - php: 7.3 env: ILLUMINATE_VERSION=5.6.* + - php: 7.2 + env: ILLUMINATE_VERSION=5.7.* + - php: 7.3 + env: ILLUMINATE_VERSION=5.8.* + - php: 7.3 + env: ILLUMINATE_VERSION=6.* + - php: 7.4 + env: ILLUMINATE_VERSION=7.* # instal our framework version before_install: diff --git a/README.md b/README.md index e89243c..a5e6d16 100644 --- a/README.md +++ b/README.md @@ -9,14 +9,14 @@ Laravel Artisan commands that make it easy to run job queues using the Scheduler This is ideal for shared hosting or situations where you are not fully in control of the services or management of your hosting infrastructure and all you have access to is a Cron. -## Versions +## Versions Support Matrix -**Confirmed to be working:** +| QueueButler | PHP | Laravel | +| :---------: | :-------: | :-------: | +| 1.4 | 5.6 - 7.3 | 5.3 - 5.6 | +| 2.0 | 7.0 - 7.4 | 5.5 - 7.x | -* Laravel 5.3 -* Laravel 5.4 -* Laravel 5.5 -* Laravel 5.6 +**Note:** The PHP version support corresponds with the Laravel PHP support. ## Install diff --git a/composer.json b/composer.json index 5afe495..0ac5e9b 100644 --- a/composer.json +++ b/composer.json @@ -13,14 +13,14 @@ } ], "require": { - "php": "~5.6|~7.0", - "laravel/framework": "5.*" + "php": "~7.0", + "laravel/framework": ">=5.5" }, "require-dev": { "phpunit/phpunit": "~5.7", "squizlabs/php_codesniffer": "^2.3", "doctrine/dbal": "^2.5", - "web-chefs/laravel-app-spawn": "^1.0" + "web-chefs/laravel-app-spawn": ">=1.5" }, "autoload": { "psr-4": { diff --git a/phpunit.xml b/phpunit.xml index 9ca6f27..ca0c83f 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,5 +1,4 @@ - downForMaintenance()) { + if ($this->downForMaintenance() && $this->option('once')) { return $this->worker->sleep($this->option('sleep')); } @@ -71,7 +60,9 @@ public function fire() // connection being run for the queue operation currently being executed. $queue = $this->getQueue($connection); - $this->runWorker($connection, $queue); + return $this->runWorker( + $connection, $queue + ); } /** @@ -79,20 +70,22 @@ public function fire() * * @param string $connection * @param string $queue - * @return array + * + * @return integer */ protected function runWorker($connection, $queue) { $this->worker->setCache($this->laravel['cache']->driver()); + return $this->worker->batch( $connection, $queue, $this->gatherWorkerOptions() ); } /** * Gather all of the queue worker options as a single object. * - * @return \Illuminate\Queue\WorkerOptions + * @return BatchOptions */ - protected function gatherWorkerOptions() + protected function gatherWorkerOptions(): WorkerOptions { return new BatchOptions( $this->option('delay'), @@ -101,9 +94,9 @@ protected function gatherWorkerOptions() $this->option('sleep'), $this->option('tries'), $this->option('force'), + $this->option('stop-when-empty'), $this->option('time-limit'), $this->option('job-limit') ); } - } diff --git a/src/BatchOptions.php b/src/BatchOptions.php index 214f527..36a0dfa 100644 --- a/src/BatchOptions.php +++ b/src/BatchOptions.php @@ -1,13 +1,14 @@ timeLimit = $timeLimit; - $this->jobLimit = $jobLimit; + parent::__construct((int)$delay, + (int)$memory, + (int)$timeout, + (int)$sleep, + (int)$maxTries, + (bool)$force, + (bool)$stopWhenEmpty); + + $this->timeLimit = (int)$timeLimit; + $this->jobLimit = (int)$jobLimit; } - -} \ No newline at end of file +} diff --git a/src/BatchRunner.php b/src/BatchRunner.php deleted file mode 100644 index 3057286..0000000 --- a/src/BatchRunner.php +++ /dev/null @@ -1,146 +0,0 @@ -options = $options; - $this->startTime = microtime(true); - $this->jobCount = 0; - $this->daemon($connectionName, $queue, $options); - } - - /** - * Listen to the given queue in a loop. - * - * @param string $connectionName - * @param string $queue - * @param \Illuminate\Queue\WorkerOptions $options - * @return void - */ - public function daemon($connectionName, $queue, WorkerOptions $options) - { - try { - parent::daemon($connectionName, $queue, $options); - } - catch (StopBatch $e) { - // Check if the batch was cleanly stopped - // Then do nothing - } - } - - /** - * Raise the after queue job event. - * - * @param string $connectionName - * @param \Illuminate\Contracts\Queue\Job $job - * @return void - */ - protected function raiseAfterJobEvent($connectionName, $job) - { - $this->jobCount++; - parent::raiseAfterJobEvent($connectionName, $job); - $this->checkLimits(); - } - - /** - * Stop the process if necessary. - * - * @param WorkerOptions $options - * @param int $lastRestart - */ - protected function stopIfNecessary(WorkerOptions $options, $lastRestart) - { - parent::stopIfNecessary($options, $lastRestart); - $this->checkLimits(); - } - - /** - * Sleep the script for a given number of seconds. - * - * @param int $seconds - * @return void - */ - public function sleep($seconds) - { - $this->checkLimits(); - parent::sleep($seconds); - } - - /** - * Stop listening and bail out of the script. - * - * @return void - */ - public function stop($status = 0) - { - // Cleanly handle stopping a batch without resorting to killing the process - throw new StopBatch(); - } - - /** - * Check our batch limits and stop the command if we reach a limit. - * - * @param WorkerOptions $options - */ - protected function checkLimits() - { - if ($this->isTimeLimit($this->options->timeLimit) || $this->isJobLimit($this->options->jobLimit)) { - $this->stop(); - } - } - - /** - * Check if the batch timelimit has been reached. - * - * @param init $timeLimit - * - * @return boolean - */ - protected function isTimeLimit($timeLimit) - { - return (microtime(true) - $this->startTime) > $timeLimit; - } - - /** - * Check if the batch job limit has been reached. - * - * @param int $jobLimit - * - * @return boolean - */ - protected function isJobLimit($jobLimit) - { - return $this->jobCount >= $jobLimit; - } - -} \ No newline at end of file diff --git a/src/BatchWorker.php b/src/BatchWorker.php new file mode 100644 index 0000000..359b90b --- /dev/null +++ b/src/BatchWorker.php @@ -0,0 +1,206 @@ +options = $options; + $this->startTime = microtime(true); + $this->jobCount = 0; + $this->exitCode = null; + + return $this->batchDaemon($connectionName, $queue, $options); + } + + /** + * Listen to the given queue in a loop. + * + * @param string $connectionName + * @param string $queue + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + */ + public function batchDaemon($connectionName, $queue, WorkerOptions $options) + { + if ($this->supportsAsyncSignals()) { + $this->listenForSignals(); + } + + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($options, $connectionName, $queue)) { + $this->pauseWorker($options, $lastRestart); + + continue; + } + + // First, we will attempt to get the next job off of the queue. We will also + // register the timeout handler and reset the alarm for this job so it is + // not stuck in a frozen state forever. Then, we can fire off this job. + $job = $this->getNextJob( + $this->manager->connection($connectionName), $queue + ); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($job, $options); + } + + // If the daemon should run (not in maintenance mode, etc.), then we can run + // fire off this job for processing. Otherwise, we will need to sleep the + // worker so no more jobs are processed until they should be processed. + if ($job) { + $this->runJob($job, $connectionName, $options); + } else { + $this->sleep($options->sleep); + } + + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopIfNecessary($options, $lastRestart, $job); + + // gracefully exit loop and batch if we have a exit code + if ($this->exitCode !== null) { + // return exit code + return $this->exitCode; + } + } + } + + /** + * Raise the after queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * + * @return void + */ + protected function raiseAfterJobEvent($connectionName, $job) + { + $this->jobCount++; + parent::raiseAfterJobEvent($connectionName, $job); + $this->checkLimits(); + } + + /** + * Stop the process if necessary. + * + * @param WorkerOptions $options + * @param int $lastRestart + * + * @return void + */ + protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null) + { + parent::stopIfNecessary($options, $lastRestart, $job); + $this->checkLimits(); + } + + /** + * Stop listening and bail out of the script. + * + * @param int $status + * + * @return void + */ + public function stop($status = 0) + { + $this->events->dispatch(new WorkerStopping($status)); + + // Cleanly handle stopping a batch without resorting to killing the process + // This is required for end to end testing + $this->exitCode = $status; + } + + /** + * Sleep the script for a given number of seconds. + * + * @param int $seconds + * + * @return void + */ + public function sleep($seconds) + { + $this->checkLimits(); + parent::sleep($seconds); + } + + /** + * Check our batch limits and stop the command if we reach a limit. + * + * @param WorkerOptions $options + * + * @return void + */ + protected function checkLimits() + { + if ($this->isTimeLimit() || $this->isJobLimit()) { + $this->stop(); + } + } + + /** + * Check if the batch timelimit has been reached. + * + * @return boolean + */ + protected function isTimeLimit(): bool + { + return (microtime(true) - $this->startTime) > $this->options->timeLimit; + } + + /** + * Check if the batch job limit has been reached. + * + * @return boolean + */ + protected function isJobLimit(): bool + { + return $this->jobCount >= $this->options->jobLimit; + } +} diff --git a/src/Contracts/QueueButtlerBatchWorkerInterface.php b/src/Contracts/QueueButtlerBatchWorkerInterface.php new file mode 100644 index 0000000..319d643 --- /dev/null +++ b/src/Contracts/QueueButtlerBatchWorkerInterface.php @@ -0,0 +1,10 @@ +queue = $queue; + $this->connectionName = $connectionName; + } +} diff --git a/src/Laravel/README.md b/src/Laravel/README.md new file mode 100644 index 0000000..fb00f1b --- /dev/null +++ b/src/Laravel/README.md @@ -0,0 +1,14 @@ +# Laravel Copies + +In an attempt to support a feature parity with Laravel we first extended from +Laravel's own framework classes. + +This very quickly lead to version in compatibilities where class interfaces +changed breaking compatibility. + +After considering a few options it was decided to copy the latest versions of +Laravel's classes here in the "Laravel" folder and then extend from them our own +customizations. + +This way we can support a the same features of Laravel but maintain a stable +class interface across multiple Laravel versions. diff --git a/src/Laravel/WorkCommand.php b/src/Laravel/WorkCommand.php new file mode 100644 index 0000000..5d3a0cc --- /dev/null +++ b/src/Laravel/WorkCommand.php @@ -0,0 +1,245 @@ +cache = $cache; + $this->worker = $worker; + } + + /** + * Execute the console command. + * + * @return void + */ + public function handle() + { + if ($this->downForMaintenance() && $this->option('once')) { + return $this->worker->sleep($this->option('sleep')); + } + + // We'll listen to the processed and failed events so we can write information + // to the console as jobs are processed, which will let the developer watch + // which jobs are coming through a queue and be informed on its progress. + $this->listenForEvents(); + + $connection = $this->argument('connection') + ?: $this->laravel['config']['queue.default']; + + // We need to get the right queue for the connection which is set in the queue + // configuration file for the application. We will pull it based on the set + // connection being run for the queue operation currently being executed. + $queue = $this->getQueue($connection); + + $this->runWorker( + $connection, $queue + ); + } + + /** + * Run the worker instance. + * + * @param string $connection + * @param string $queue + * @return array + */ + protected function runWorker($connection, $queue) + { + $this->worker->setCache($this->cache); + + return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}( + $connection, $queue, $this->gatherWorkerOptions() + ); + } + + /** + * Gather all of the queue worker options as a single object. + * + * @return \Illuminate\Queue\WorkerOptions + */ + protected function gatherWorkerOptions() + { + return new WorkerOptions( + $this->option('delay'), + $this->option('memory'), + $this->option('timeout'), + $this->option('sleep'), + $this->option('tries'), + $this->option('force'), + $this->option('stop-when-empty') + ); + } + + /** + * Listen for the queue events in order to update the console output. + * + * @return void + */ + protected function listenForEvents() + { + $this->laravel['events']->listen(JobProcessing::class, function ($event) { + $this->writeOutput($event->job, 'starting'); + }); + + $this->laravel['events']->listen(JobProcessed::class, function ($event) { + $this->writeOutput($event->job, 'success'); + }); + + $this->laravel['events']->listen(JobFailed::class, function ($event) { + $this->writeOutput($event->job, 'failed'); + + $this->logFailedJob($event); + }); + } + + /** + * Write the status output for the queue worker. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param string $status + * @return void + */ + protected function writeOutput(Job $job, $status) + { + switch ($status) { + case 'starting': + return $this->writeStatus($job, 'Processing', 'comment'); + case 'success': + return $this->writeStatus($job, 'Processed', 'info'); + case 'failed': + return $this->writeStatus($job, 'Failed', 'error'); + } + } + + /** + * Format the status output for the queue worker. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param string $status + * @param string $type + * @return void + */ + protected function writeStatus(Job $job, $status, $type) + { + $this->output->writeln(sprintf( + "<{$type}>[%s][%s] %s %s", + Carbon::now()->format('Y-m-d H:i:s'), + $job->getJobId(), + str_pad("{$status}:", 11), $job->resolveName() + )); + } + + /** + * Store a failed job event. + * + * @param \Illuminate\Queue\Events\JobFailed $event + * @return void + */ + protected function logFailedJob(JobFailed $event) + { + $this->laravel['queue.failer']->log( + $event->connectionName, $event->job->getQueue(), + $event->job->getRawBody(), $event->exception + ); + } + + /** + * Get the queue name for the worker. + * + * @param string $connection + * @return string + */ + protected function getQueue($connection) + { + return $this->option('queue') ?: $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); + } + + /** + * Determine if the worker should run in maintenance mode. + * + * @return bool + */ + protected function downForMaintenance() + { + return $this->option('force') ? false : $this->laravel->isDownForMaintenance(); + } +} diff --git a/src/Laravel/Worker.php b/src/Laravel/Worker.php new file mode 100644 index 0000000..7fc994d --- /dev/null +++ b/src/Laravel/Worker.php @@ -0,0 +1,756 @@ +events = $events; + $this->manager = $manager; + $this->exceptions = $exceptions; + $this->isDownForMaintenance = $isDownForMaintenance; + } + + /** + * Listen to the given queue in a loop. + * + * @param string $connectionName + * @param string $queue + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + */ + public function daemon($connectionName, $queue, WorkerOptions $options) + { + if ($this->supportsAsyncSignals()) { + $this->listenForSignals(); + } + + $lastRestart = $this->getTimestampOfLastQueueRestart(); + + while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (! $this->daemonShouldRun($options, $connectionName, $queue)) { + $this->pauseWorker($options, $lastRestart); + + continue; + } + + // First, we will attempt to get the next job off of the queue. We will also + // register the timeout handler and reset the alarm for this job so it is + // not stuck in a frozen state forever. Then, we can fire off this job. + $job = $this->getNextJob( + $this->manager->connection($connectionName), $queue + ); + + if ($this->supportsAsyncSignals()) { + $this->registerTimeoutHandler($job, $options); + } + + // If the daemon should run (not in maintenance mode, etc.), then we can run + // fire off this job for processing. Otherwise, we will need to sleep the + // worker so no more jobs are processed until they should be processed. + if ($job) { + $this->runJob($job, $connectionName, $options); + } else { + $this->sleep($options->sleep); + } + + if ($this->supportsAsyncSignals()) { + $this->resetTimeoutHandler(); + } + + // Finally, we will check to see if we have exceeded our memory limits or if + // the queue should restart based on other indications. If so, we'll stop + // this worker and let whatever is "monitoring" it restart the process. + $this->stopIfNecessary($options, $lastRestart, $job); + } + } + + /** + * Register the worker timeout handler. + * + * @param \Illuminate\Contracts\Queue\Job|null $job + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + */ + protected function registerTimeoutHandler($job, WorkerOptions $options) + { + // We will register a signal handler for the alarm signal so that we can kill this + // process if it is running too long because it has frozen. This uses the async + // signals supported in recent versions of PHP to accomplish it conveniently. + pcntl_signal(SIGALRM, function () use ($job, $options) { + if ($job) { + $this->markJobAsFailedIfWillExceedMaxAttempts( + $job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job) + ); + } + + $this->kill(1); + }); + + pcntl_alarm( + max($this->timeoutForJob($job, $options), 0) + ); + } + + /** + * Reset the worker timeout handler. + * + * @return void + */ + protected function resetTimeoutHandler() + { + pcntl_alarm(0); + } + + /** + * Get the appropriate timeout for the given job. + * + * @param \Illuminate\Contracts\Queue\Job|null $job + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return int + */ + protected function timeoutForJob($job, WorkerOptions $options) + { + return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; + } + + /** + * Determine if the daemon should process on this iteration. + * + * @param \Illuminate\Queue\WorkerOptions $options + * @param string $connectionName + * @param string $queue + * + * @return bool + */ + protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue) + { + return ! ((($this->isDownForMaintenance)() && ! $options->force) || + $this->paused || + $this->events->until(new Looping($connectionName, $queue)) === false); + } + + /** + * Pause the worker for the current loop. + * + * @param \Illuminate\Queue\WorkerOptions $options + * @param int $lastRestart + * + * @return void + */ + protected function pauseWorker(WorkerOptions $options, $lastRestart) + { + $this->sleep($options->sleep > 0 ? $options->sleep : 1); + + $this->stopIfNecessary($options, $lastRestart); + } + + /** + * Stop the process if necessary. + * + * @param \Illuminate\Queue\WorkerOptions $options + * @param int $lastRestart + * @param mixed $job + * + * @return void + */ + protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null) + { + if ($this->shouldQuit) { + $this->stop(); + } elseif ($this->memoryExceeded($options->memory)) { + $this->stop(12); + } elseif ($this->queueShouldRestart($lastRestart)) { + $this->stop(); + } elseif ($options->stopWhenEmpty && is_null($job)) { + $this->stop(); + } + } + + /** + * Process the next job on the queue. + * + * @param string $connectionName + * @param string $queue + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + */ + public function runNextJob($connectionName, $queue, WorkerOptions $options) + { + $job = $this->getNextJob( + $this->manager->connection($connectionName), $queue + ); + + // If we're able to pull a job off of the stack, we will process it and then return + // from this method. If there is no job on the queue, we will "sleep" the worker + // for the specified number of seconds, then keep processing jobs after sleep. + if ($job) { + return $this->runJob($job, $connectionName, $options); + } + + $this->sleep($options->sleep); + } + + /** + * Get the next job from the queue connection. + * + * @param \Illuminate\Contracts\Queue\Queue $connection + * @param string $queue + * + * @return \Illuminate\Contracts\Queue\Job|null + */ + protected function getNextJob($connection, $queue) + { + try { + foreach (explode(',', $queue) as $queue) { + if (! is_null($job = $connection->pop($queue))) { + return $job; + } + } + } + catch (Throwable $e) { + $this->exceptions->report($e); + + $this->stopWorkerIfLostConnection($e); + + $this->sleep(1); + } + } + + /** + * Process the given job. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param string $connectionName + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + */ + protected function runJob($job, $connectionName, WorkerOptions $options) + { + try { + return $this->process($connectionName, $job, $options); + } + catch (\Error $e) { + dd($e); + throw $e; + } + catch (Throwable $e) { + dd($e); + $this->exceptions->report($e); + + $this->stopWorkerIfLostConnection($e); + } + } + + /** + * Stop the worker if we have lost connection to a database. + * + * @param \Throwable $e + * + * @return void + */ + protected function stopWorkerIfLostConnection($e) + { + if ($this->causedByLostConnection($e)) { + $this->shouldQuit = true; + } + } + + /** + * Process the given job from the queue. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Illuminate\Queue\WorkerOptions $options + * + * @return void + * + * @throws \Throwable + */ + public function process($connectionName, $job, WorkerOptions $options) + { + try { + // First we will raise the before job event and determine if the job has already ran + // over its maximum attempt limits, which could primarily happen when this job is + // continually timing out and not actually throwing any exceptions from itself. + $this->raiseBeforeJobEvent($connectionName, $job); + + // dd($job); + + $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( + $connectionName, $job, (int) $options->maxTries + ); + + if ($job->isDeleted()) { + return $this->raiseAfterJobEvent($connectionName, $job); + } + + // Here we will fire off the job and let it process. We will catch any exceptions so + // they can be reported to the developers logs, etc. Once the job is finished the + // proper events will be fired to let any listeners know this job has finished. + $job->fire(); + + $this->raiseAfterJobEvent($connectionName, $job); + } catch (Throwable $e) { + $this->handleJobException($connectionName, $job, $options, $e); + } + } + + /** + * Handle an exception that occurred while the job was running. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Illuminate\Queue\WorkerOptions $options + * @param \Throwable $e + * + * @return void + * + * @throws \Throwable + */ + protected function handleJobException($connectionName, $job, WorkerOptions $options, Throwable $e) + { + try { + // First, we will go ahead and mark the job as failed if it will exceed the maximum + // attempts it is allowed to run the next time we process it. If so we will just + // go ahead and mark it as failed now so we do not have to release this again. + if (! $job->hasFailed()) { + $this->markJobAsFailedIfWillExceedMaxAttempts( + $connectionName, $job, (int) $options->maxTries, $e + ); + + if (method_exists($job, 'uuid') && method_exists($job, 'maxExceptions')) { + $this->markJobAsFailedIfWillExceedMaxExceptions( + $connectionName, $job, $e + ); + } + } + + $this->raiseExceptionOccurredJobEvent( + $connectionName, $job, $e + ); + } finally { + // If we catch an exception, we will attempt to release the job back onto the queue + // so it is not lost entirely. This'll let the job be retried at a later time by + // another listener (or this same one). We will re-throw this exception after. + if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) { + $job->release( + method_exists($job, 'delaySeconds') && ! is_null($job->delaySeconds()) + ? $job->delaySeconds() + : $options->delay + ); + } + } + + throw $e; + } + + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * This will likely be because the job previously exceeded a timeout. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param int $maxTries + * + * @return void + * + * @throws \Throwable + */ + protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) + { + $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + $timeoutAt = $job->timeoutAt(); + + if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) { + return; + } + + if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) { + return; + } + + $this->failJob($job, $e = $this->maxAttemptsExceededException($job)); + + throw $e; + } + + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param int $maxTries + * @param \Throwable $e + * + * @return void + */ + protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, Throwable $e) + { + $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) { + $this->failJob($job, $e); + } + + if ($maxTries > 0 && $job->attempts() >= $maxTries) { + $this->failJob($job, $e); + } + } + + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Throwable $e + * + * @return void + */ + protected function markJobAsFailedIfWillExceedMaxExceptions($connectionName, $job, Throwable $e) + { + if (! $this->cache || is_null($uuid = $job->uuid()) || + is_null($maxExceptions = $job->maxExceptions())) { + return; + } + + if (! $this->cache->get('job-exceptions:'.$uuid)) { + $this->cache->put('job-exceptions:'.$uuid, 0, Carbon::now()->addDay()); + } + + if ($maxExceptions <= $this->cache->increment('job-exceptions:'.$uuid)) { + $this->cache->forget('job-exceptions:'.$uuid); + + $this->failJob($job, $e); + } + } + + /** + * Mark the given job as failed and raise the relevant event. + * + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Throwable $e + * + * @return void + */ + protected function failJob($job, Throwable $e) + { + return $job->fail($e); + } + + /** + * Raise the before queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * + * @return void + */ + protected function raiseBeforeJobEvent($connectionName, $job) + { + $this->events->dispatch(new JobProcessing( + $connectionName, $job + )); + } + + /** + * Raise the after queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * + * @return void + */ + protected function raiseAfterJobEvent($connectionName, $job) + { + $this->events->dispatch(new JobProcessed( + $connectionName, $job + )); + } + + /** + * Raise the exception occurred queue job event. + * + * @param string $connectionName + * @param \Illuminate\Contracts\Queue\Job $job + * @param \Throwable $e + * + * @return void + */ + protected function raiseExceptionOccurredJobEvent($connectionName, $job, Throwable $e) + { + $this->events->dispatch(new JobExceptionOccurred( + $connectionName, $job, $e + )); + } + + /** + * Determine if the queue worker should restart. + * + * @param int|null $lastRestart + * @return bool + */ + protected function queueShouldRestart($lastRestart) + { + return $this->getTimestampOfLastQueueRestart() != $lastRestart; + } + + /** + * Get the last queue restart timestamp, or null. + * + * @return int|null + */ + protected function getTimestampOfLastQueueRestart() + { + if ($this->cache) { + return $this->cache->get('illuminate:queue:restart'); + } + } + + /** + * Enable async signals for the process. + * + * @return void + */ + protected function listenForSignals() + { + pcntl_async_signals(true); + + pcntl_signal(SIGTERM, function () { + $this->shouldQuit = true; + }); + + pcntl_signal(SIGUSR2, function () { + $this->paused = true; + }); + + pcntl_signal(SIGCONT, function () { + $this->paused = false; + }); + } + + /** + * Determine if "async" signals are supported. + * + * @return bool + */ + protected function supportsAsyncSignals() + { + return extension_loaded('pcntl') && function_exists('pcntl_async_signals'); + } + + /** + * Determine if the memory limit has been exceeded. + * + * @param int $memoryLimit + * + * @return bool + */ + public function memoryExceeded($memoryLimit) + { + return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit; + } + + /** + * Stop listening and bail out of the script. + * + * @param int $status + * + * @return void + */ + public function stop($status = 0) + { + $this->events->dispatch(new WorkerStopping($status)); + + exit($status); + } + + /** + * Kill the process. + * + * @param int $status + * @return void + */ + public function kill($status = 0) + { + $this->events->dispatch(new WorkerStopping($status)); + + if (extension_loaded('posix')) { + posix_kill(getmypid(), SIGKILL); + } + + exit($status); + } + + /** + * Create an instance of MaxAttemptsExceededException. + * + * @param \Illuminate\Contracts\Queue\Job $job + * + * @return \Illuminate\Queue\MaxAttemptsExceededException + */ + protected function maxAttemptsExceededException($job) + { + return new MaxAttemptsExceededException( + $job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.' + ); + } + + /** + * Sleep the script for a given number of seconds. + * + * @param int|float $seconds + * + * @return void + */ + public function sleep($seconds) + { + if ($seconds < 1) { + usleep($seconds * 1000000); + } else { + sleep($seconds); + } + } + + /** + * Set the cache repository implementation. + * + * @param \Illuminate\Contracts\Cache\Repository $cache + * + * @return void + */ + public function setCache(CacheContract $cache) + { + $this->cache = $cache; + } + + /** + * Get the queue manager instance. + * + * @return \Illuminate\Queue\QueueManager + */ + public function getManager() + { + return $this->manager; + } + + /** + * Set the queue manager instance. + * + * @param \Illuminate\Contracts\Queue\Factory $manager + * @return void + */ + public function setManager(QueueManager $manager) + { + $this->manager = $manager; + } +} diff --git a/src/Laravel/WorkerOptions.php b/src/Laravel/WorkerOptions.php new file mode 100644 index 0000000..dca5d07 --- /dev/null +++ b/src/Laravel/WorkerOptions.php @@ -0,0 +1,88 @@ +delay = $delay; + $this->sleep = $sleep; + $this->force = $force; + $this->memory = $memory; + $this->timeout = $timeout; + $this->maxTries = $maxTries; + $this->stopWhenEmpty = $stopWhenEmpty; + } +} diff --git a/src/QueueButlerServiceProvider.php b/src/QueueButlerServiceProvider.php index 63a3f32..98d50b4 100644 --- a/src/QueueButlerServiceProvider.php +++ b/src/QueueButlerServiceProvider.php @@ -1,16 +1,23 @@ app->bind(QueueButtlerBatchWorkerInterface::class, function ($app) { + $isDownForMaintenance = function () { + return $this->app->isDownForMaintenance(); + }; + + return new BatchWorker( + $app['queue'], + $app['events'], + $app[ExceptionHandler::class], + $isDownForMaintenance + ); + }); + $this->commands($this->commands); } - -} \ No newline at end of file +} diff --git a/tests/Jobs/QueueBatchTestJob.php b/tests/Jobs/QueueBatchTestJob.php index ba0a386..24efb1a 100644 --- a/tests/Jobs/QueueBatchTestJob.php +++ b/tests/Jobs/QueueBatchTestJob.php @@ -1,5 +1,7 @@ assertEquals(1, $queue->count()); // Validate the job is our job + // By popping the job we inadvertently increment the number of attempts $job = Queue::pop(); $this->assertEquals($job->resolveName(), QueueBatchTestJob::class); @@ -113,7 +126,9 @@ public function testBatchSinpleQueueProcessing() $job->release(); // Test Job queue processing using queue:batch - $this->artisan('queue:batch', ['--job-limit' => 1, '--time-limit' => 2]); + // Attempts needs to be 2 because we manually popped our job once + $options = ['--job-limit' => 1, '--time-limit' => 2, '--tries' => 2]; + $this->artisan('queue:batch', $options); $this->assertEquals(0, $queue->count()); $this->assertEquals($this->answerToken, $this->app->make('QueueBatchJobAnswer')); } @@ -135,7 +150,7 @@ public function testBatchMultiWithJobLimit() $queue = $this->queueTestDbQuery(); // Create 5 test jobs - collect(range(0,4))->each(function() { + collect(range(1,5))->each(function() { dispatch(new QueueBatchTestJob); }); @@ -149,7 +164,7 @@ public function testBatchMultiWithJobLimit() $this->assertEquals(2, $queue->count()); // Test Job queue processing the remaining jobs - $this->artisan('queue:batch', ['--job-limit' => 10, '--time-limit' => 5]); + $this->artisan('queue:batch', ['--job-limit' => 5, '--time-limit' => 5]); // Check queue is empty $this->assertEquals(0, $queue->count()); @@ -168,5 +183,4 @@ protected function setupJobAnswerToken() return $this->answerToken; }); } - } diff --git a/tests/migrations/2017_09_13_140144_create_jobs_table.php b/tests/migrations/2017_09_13_140144_create_jobs_table.php index ba80e8f..9dd866a 100644 --- a/tests/migrations/2017_09_13_140144_create_jobs_table.php +++ b/tests/migrations/2017_09_13_140144_create_jobs_table.php @@ -1,5 +1,7 @@