Skip to content

Commit

Permalink
[10.x] Added JobPopping and JobPopped events (#46220)
Browse files Browse the repository at this point in the history
* Added JobPopping and JobPopped events

* pr-46220 styleci

* formatting

* Update Worker.php

---------

Co-authored-by: Taylor Otwell <[email protected]>
  • Loading branch information
mingalevme and taylorotwell authored Feb 25, 2023
1 parent 3b7f6d0 commit 864d2e5
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 5 deletions.
33 changes: 33 additions & 0 deletions src/Illuminate/Queue/Events/JobPopped.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Illuminate\Queue\Events;

class JobPopped
{
/**
* The connection name.
*
* @var string
*/
public $connectionName;

/**
* The job instance.
*
* @var \Illuminate\Contracts\Queue\Job|null
*/
public $job;

/**
* Create a new event instance.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job|null $job
* @return void
*/
public function __construct($connectionName, $job)
{
$this->connectionName = $connectionName;
$this->job = $job;
}
}
24 changes: 24 additions & 0 deletions src/Illuminate/Queue/Events/JobPopping.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace Illuminate\Queue\Events;

class JobPopping
{
/**
* The connection name.
*
* @var string
*/
public $connectionName;

/**
* Create a new event instance.
*
* @param string $connectionName
* @return void
*/
public function __construct($connectionName)
{
$this->connectionName = $connectionName;
}
}
36 changes: 35 additions & 1 deletion src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Illuminate\Database\DetectsLostConnections;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobPopped;
use Illuminate\Queue\Events\JobPopping;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobReleasedAfterException;
Expand Down Expand Up @@ -342,13 +344,20 @@ protected function getNextJob($connection, $queue)
return $connection->pop($queue);
};

$this->raiseBeforeJobPopEvent($connection->getConnectionName());

try {
if (isset(static::$popCallbacks[$this->name])) {
return (static::$popCallbacks[$this->name])($popJobCallback, $queue);
return tap(
(static::$popCallbacks[$this->name])($popJobCallback, $queue),
fn ($job) => $this->raiseAfterJobPopEvent($connection->getConnectionName(), $job)
);
}

foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $popJobCallback($queue))) {
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);

return $job;
}
}
Expand Down Expand Up @@ -601,6 +610,31 @@ protected function calculateBackoff($job, WorkerOptions $options)
return (int) ($backoff[$job->attempts() - 1] ?? last($backoff));
}

/**
* Raise the before job has been popped.
*
* @param string $connectionName
* @return void
*/
protected function raiseBeforeJobPopEvent($connectionName)
{
$this->events->dispatch(new JobPopping($connectionName));
}

/**
* Raise the after job has been popped.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job|null $job
* @return void
*/
protected function raiseAfterJobPopEvent($connectionName, $job)
{
$this->events->dispatch(new JobPopped(
$connectionName, $job
));
}

/**
* Raise the before queue job event.
*
Expand Down
26 changes: 22 additions & 4 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Job as QueueJobContract;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobPopped;
use Illuminate\Queue\Events\JobPopping;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\MaxAttemptsExceededException;
Expand Down Expand Up @@ -50,6 +52,8 @@ public function testJobCanBeFired()
$worker = $this->getWorker('default', ['queue' => [$job = new WorkerFakeJob]]);
$worker->runNextJob('default', 'queue', new WorkerOptions);
$this->assertTrue($job->fired);
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobPopping::class))->once();
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobPopped::class))->once();
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessing::class))->once();
$this->events->shouldHaveReceived('dispatch')->with(m::type(JobProcessed::class))->once();
}
Expand Down Expand Up @@ -118,7 +122,7 @@ public function testJobCanBeFiredBasedOnPriority()
public function testExceptionIsReportedIfConnectionThrowsExceptionOnJobPop()
{
$worker = new InsomniacWorker(
new WorkerFakeManager('default', new BrokenQueueConnection($e = new RuntimeException)),
new WorkerFakeManager('default', new BrokenQueueConnection('default', $e = new RuntimeException)),
$this->events,
$this->exceptionHandler,
function () {
Expand Down Expand Up @@ -377,7 +381,7 @@ private function getWorker($connectionName = 'default', $jobs = [], ?callable $i
private function workerDependencies($connectionName = 'default', $jobs = [], ?callable $isInMaintenanceMode = null)
{
return [
new WorkerFakeManager($connectionName, new WorkerFakeConnection($jobs)),
new WorkerFakeManager($connectionName, new WorkerFakeConnection($connectionName, $jobs)),
$this->events,
$this->exceptionHandler,
$isInMaintenanceMode ?? function () {
Expand Down Expand Up @@ -444,32 +448,46 @@ public function connection($name = null)

class WorkerFakeConnection
{
public $connectionName;
public $jobs = [];

public function __construct($jobs)
public function __construct($connectionName, $jobs)
{
$this->connectionName = $connectionName;
$this->jobs = $jobs;
}

public function pop($queue)
{
return array_shift($this->jobs[$queue]);
}

public function getConnectionName()
{
return $this->connectionName;
}
}

class BrokenQueueConnection
{
public $connectionName;
public $exception;

public function __construct($exception)
public function __construct($connectionName, $exception)
{
$this->connectionName = $connectionName;
$this->exception = $exception;
}

public function pop($queue)
{
throw $this->exception;
}

public function getConnectionName()
{
return $this->connectionName;
}
}

class WorkerFakeJob implements QueueJobContract
Expand Down

0 comments on commit 864d2e5

Please sign in to comment.