Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.4] Job based queue options #16257

Merged
merged 4 commits into from
Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/Illuminate/Queue/Jobs/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ abstract class Job
*/
protected $instance;

/**
* The command instance.
*
* @var mixed
*/
protected $command;

/**
* The IoC container instance.
*
Expand Down Expand Up @@ -234,6 +241,43 @@ public function payload()
return json_decode($this->getRawBody(), true);
}

/**
* The underlying command.
*
* @return mixed
*/
public function getCommand()
{
if ($this->command) {
return $this->command;
}

$payload = $this->payload();

return $this->command = isset($payload['data']['command'])
? unserialize($payload['data']['command']) : null;
}

/**
* The number of times to attempt a job.
*
* @return int
*/
public function retries()
{
return $this->getCommand() ? $this->getCommand()->retries : null;
}

/**
* The number of seconds the job can run.
*
* @return int
*/
public function timeout()
{
return $this->getCommand() ? $this->getCommand()->timeout : null;
}

/**
* Get the name of the queue the job belongs to.
*
Expand Down
88 changes: 58 additions & 30 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
$lastRestart = $this->getTimestampOfLastQueueRestart();

while (true) {
$this->registerTimeoutHandler($options);
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

if ($this->daemonShouldRun()) {
$this->runNextJob($connectionName, $queue, $options);
$this->registerTimeoutHandler($job, $options);

if ($job && $this->daemonShouldRun()) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
Expand All @@ -88,15 +92,18 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
/**
* Register the worker timeout handler (PHP 7.1+).
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler(WorkerOptions $options)
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if (version_compare(PHP_VERSION, '7.1.0') < 0 || ! extension_loaded('pcntl')) {
return;
}

$timeout = $job && $job->timeout() !== null ? $job->timeout() : $options->timeout;

pcntl_async_signals(true);

pcntl_signal(SIGALRM, function () {
Expand All @@ -105,7 +112,7 @@ protected function registerTimeoutHandler(WorkerOptions $options)
exit(1);
});

pcntl_alarm($options->timeout + $options->sleep);
pcntl_alarm($timeout + $options->sleep);
}

/**
Expand Down Expand Up @@ -137,27 +144,40 @@ protected function daemonShouldRun()
* @return void
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->runJob($job, $connectionName, $options);
}

$this->sleep($options->sleep);
}

/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
return $this->process(
$connectionName, $job, $options
);

// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->process(
$connectionName, $job, $options
);
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}

$this->sleep($options->sleep);
}

/**
Expand All @@ -169,10 +189,16 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
*/
protected function getNextJob($connection, $queue)
{
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}
}

Expand Down Expand Up @@ -255,6 +281,8 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = $job->retries() !== null ? $job->retries() : $maxTries;

if ($maxTries === 0 || $job->attempts() <= $maxTries) {
return;
}
Expand All @@ -280,6 +308,8 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
protected function markJobAsFailedIfHasExceededMaxAttempts(
$connectionName, $job, $maxTries, $e
) {
$maxTries = $job->retries() !== null ? $job->retries() : $maxTries;

if ($maxTries === 0 || $job->attempts() < $maxTries) {
return;
}
Expand All @@ -301,16 +331,14 @@ protected function failJob($connectionName, $job, $e)
return;
}

try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();

$job->failed($e);
} finally {
$this->raiseFailedJobEvent($connectionName, $job, $e);
}
$job->failed($e);

$this->raiseFailedJobEvent($connectionName, $job, $e);
}

/**
Expand Down
22 changes: 22 additions & 0 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ public function test_job_is_failed_if_it_has_already_exceeded_max_attempts()
$this->events->shouldNotHaveReceived('fire', [Mockery::type(JobProcessed::class)]);
}

public function test_job_based_max_retries()
{
$job = new WorkerFakeJob(function ($job) {
$job->attempts++;
});
$job->attempts = 2;

$job->retries = 10;

$worker = $this->getWorker('default', ['queue' => [$job]]);
$worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));

$this->assertFalse($job->deleted);
$this->assertNull($job->failedWith);
}

/**
* Helpers...
*/
Expand Down Expand Up @@ -223,6 +239,7 @@ class WorkerFakeJob
public $callback;
public $deleted = false;
public $releaseAfter;
public $retries;
public $attempts = 0;
public $failedWith;

Expand All @@ -243,6 +260,11 @@ public function payload()
return [];
}

public function retries()
{
return $this->retries;
}

public function delete()
{
$this->deleted = true;
Expand Down