diff --git a/src/Illuminate/Queue/Worker.php b/src/Illuminate/Queue/Worker.php index 3d3ab0508a12..771035fa7a29 100644 --- a/src/Illuminate/Queue/Worker.php +++ b/src/Illuminate/Queue/Worker.php @@ -123,7 +123,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - $this->stopIfNecessary($options, $lastRestart); + $this->stopIfNecessary($options, $lastRestart, $job); } } @@ -195,16 +195,17 @@ protected function pauseWorker(WorkerOptions $options, $lastRestart) * @param \Illuminate\Queue\WorkerOptions $options * @param int $lastRestart */ - protected function stopIfNecessary(WorkerOptions $options, $lastRestart) + protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null) { if ($this->shouldQuit) { $this->kill(); } - if ($this->memoryExceeded($options->memory)) { $this->stop(12); } elseif ($this->queueShouldRestart($lastRestart)) { $this->stop(); + } elseif ($options->stopOnEmptyQueue && is_null($job)) { + $this->stop(); } } diff --git a/src/Illuminate/Queue/WorkerOptions.php b/src/Illuminate/Queue/WorkerOptions.php index 3549f4414c8f..22f2a0298206 100644 --- a/src/Illuminate/Queue/WorkerOptions.php +++ b/src/Illuminate/Queue/WorkerOptions.php @@ -46,6 +46,14 @@ class WorkerOptions */ public $force; + /** + * Indicates if the worker should stop when queue is empty. + * + * @var bool + */ + public $stopOnEmptyQueue; + + /** * Create a new worker options instance. * @@ -57,7 +65,7 @@ class WorkerOptions * @param bool $force * @return void */ - public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false) + public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false, $stopOnEmptyQueue = false) { $this->delay = $delay; $this->sleep = $sleep; @@ -65,5 +73,6 @@ public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3 $this->memory = $memory; $this->timeout = $timeout; $this->maxTries = $maxTries; + $this->stopOnEmptyQueue = $stopOnEmptyQueue; } } diff --git a/tests/Queue/QueueWorkerTest.php b/tests/Queue/QueueWorkerTest.php index e0bd890edd67..b27294cd1f7c 100755 --- a/tests/Queue/QueueWorkerTest.php +++ b/tests/Queue/QueueWorkerTest.php @@ -46,6 +46,32 @@ public function test_job_can_be_fired() $this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessed::class))->once(); } + public function test_worker_can_work_until_queue_is_empty() + { + $workerOptions = new WorkerOptions; + $workerOptions->stopOnEmptyQueue = true; + + $worker = $this->getWorker('default', ['queue' => [ + $firstJob = new WorkerFakeJob, + $secondJob = new WorkerFakeJob, + ]]); + + $this->expectException(LoopBreakerException::class); + + $worker->daemon('default', 'queue', $workerOptions); + + $this->assertTrue($firstJob->fired); + + $this->assertTrue($secondJob->fired); + + $this->assertSame(0, $worker->stoppedWithStatus); + + $this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessing::class))->twice(); + + $this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessed::class))->twice(); + } + + public function test_job_can_be_fired_based_on_priority() { $worker = $this->getWorker('default', [ @@ -262,6 +288,18 @@ public function sleep($seconds) { $this->sleptFor = $seconds; } + + public function stop($status = 0) + { + $this->stoppedWithStatus = $status; + + throw new LoopBreakerException; + } + + public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue) + { + return true; + } } class WorkerFakeManager extends \Illuminate\Queue\QueueManager @@ -403,4 +441,14 @@ public function setConnectionName($name) { $this->connectionName = $name; } + + public function timeout() + { + return time() + 60; + } +} + +class LoopBreakerException extends RuntimeException +{ } +