Skip to content

Commit

Permalink
Allow daemon to stop when there is no more jobs in the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
deleugpn committed Aug 6, 2018
1 parent b49f638 commit 157a150
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
7 changes: 4 additions & 3 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart);
$this->stopIfNecessary($options, $lastRestart, $job);
}
}

Expand Down Expand Up @@ -195,16 +195,17 @@ protected function pauseWorker(WorkerOptions $options, $lastRestart)
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null)
{
if ($this->shouldQuit) {
$this->kill();
}

if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
} elseif ($options->stopOnEmptyQueue && is_null($job)) {
$this->stop();
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/Illuminate/Queue/WorkerOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class WorkerOptions
*/
public $force;

/**
* Indicates if the worker should stop when queue is empty.
*
* @var bool
*/
public $stopOnEmptyQueue;


/**
* Create a new worker options instance.
*
Expand All @@ -57,13 +65,14 @@ class WorkerOptions
* @param bool $force
* @return void
*/
public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false)
public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false, $stopOnEmptyQueue = false)
{
$this->delay = $delay;
$this->sleep = $sleep;
$this->force = $force;
$this->memory = $memory;
$this->timeout = $timeout;
$this->maxTries = $maxTries;
$this->stopOnEmptyQueue = $stopOnEmptyQueue;
}
}
48 changes: 48 additions & 0 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ public function test_job_can_be_fired()
$this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessed::class))->once();
}

public function test_worker_can_work_until_queue_is_empty()
{
$workerOptions = new WorkerOptions;
$workerOptions->stopOnEmptyQueue = true;

$worker = $this->getWorker('default', ['queue' => [
$firstJob = new WorkerFakeJob,
$secondJob = new WorkerFakeJob,
]]);

$this->expectException(LoopBreakerException::class);

$worker->daemon('default', 'queue', $workerOptions);

$this->assertTrue($firstJob->fired);

$this->assertTrue($secondJob->fired);

$this->assertSame(0, $worker->stoppedWithStatus);

$this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessing::class))->twice();

$this->events->shouldHaveReceived('dispatch')->with(Mockery::type(JobProcessed::class))->twice();
}


public function test_job_can_be_fired_based_on_priority()
{
$worker = $this->getWorker('default', [
Expand Down Expand Up @@ -262,6 +288,18 @@ public function sleep($seconds)
{
$this->sleptFor = $seconds;
}

public function stop($status = 0)
{
$this->stoppedWithStatus = $status;

throw new LoopBreakerException;
}

public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return true;
}
}

class WorkerFakeManager extends \Illuminate\Queue\QueueManager
Expand Down Expand Up @@ -403,4 +441,14 @@ public function setConnectionName($name)
{
$this->connectionName = $name;
}

public function timeout()
{
return time() + 60;
}
}

class LoopBreakerException extends RuntimeException
{
}

0 comments on commit 157a150

Please sign in to comment.