From 41ce26a58ea239cac58222fab3ad42ad8f65ce09 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 18 Nov 2020 14:54:32 +0200 Subject: [PATCH 01/11] delay pushing jobs to queue until database transactions are committed --- .../Database/Concerns/ManagesTransactions.php | 75 +++++++++++++++++-- src/Illuminate/Database/Connection.php | 14 ++++ src/Illuminate/Queue/BeanstalkdQueue.php | 26 ++++--- src/Illuminate/Queue/Queue.php | 20 +++++ src/Illuminate/Queue/RedisQueue.php | 23 +++--- src/Illuminate/Queue/SqsQueue.php | 20 +++-- tests/Database/DatabaseConnectionTest.php | 32 ++++++++ .../Integration/Queue/QueueConnectionTest.php | 52 +++++++++++++ tests/Queue/QueueBeanstalkdQueueTest.php | 8 ++ tests/Queue/QueueRedisQueueTest.php | 8 ++ 10 files changed, 242 insertions(+), 36 deletions(-) create mode 100644 tests/Integration/Queue/QueueConnectionTest.php diff --git a/src/Illuminate/Database/Concerns/ManagesTransactions.php b/src/Illuminate/Database/Concerns/ManagesTransactions.php index 1dd4475290d6..7018ebee80fd 100644 --- a/src/Illuminate/Database/Concerns/ManagesTransactions.php +++ b/src/Illuminate/Database/Concerns/ManagesTransactions.php @@ -44,7 +44,7 @@ public function transaction(Closure $callback, $attempts = 1) $this->getPdo()->commit(); } - $this->transactions = max(0, $this->transactions - 1); + $this->decrementTransactionsCount(); } catch (Throwable $e) { $this->handleCommitTransactionException( $e, $currentAttempt, $attempts @@ -76,7 +76,7 @@ protected function handleTransactionException(Throwable $e, $currentAttempt, $ma // let the developer handle it in another way. We will decrement too. if ($this->causedByConcurrencyError($e) && $this->transactions > 1) { - $this->transactions--; + $this->decrementTransactionsCount(); throw $e; } @@ -105,7 +105,7 @@ public function beginTransaction() { $this->createTransaction(); - $this->transactions++; + $this->incrementTransactionsCount(); $this->fireConnectionEvent('beganTransaction'); } @@ -178,7 +178,7 @@ public function commit() $this->getPdo()->commit(); } - $this->transactions = max(0, $this->transactions - 1); + $this->decrementTransactionsCount(); $this->fireConnectionEvent('committed'); } @@ -195,7 +195,7 @@ public function commit() */ protected function handleCommitTransactionException(Throwable $e, $currentAttempt, $maxAttempts) { - $this->transactions = max(0, $this->transactions - 1); + $this->decrementTransactionsCount(); if ($this->causedByConcurrencyError($e) && $currentAttempt < $maxAttempts) { @@ -203,7 +203,7 @@ protected function handleCommitTransactionException(Throwable $e, $currentAttemp } if ($this->causedByLostConnection($e)) { - $this->transactions = 0; + $this->updateTransactionsCount(0); } throw $e; @@ -239,7 +239,7 @@ public function rollBack($toLevel = null) $this->handleRollBackException($e); } - $this->transactions = $toLevel; + $this->updateTransactionsCount($toLevel); $this->fireConnectionEvent('rollingBack'); } @@ -274,7 +274,7 @@ protected function performRollBack($toLevel) protected function handleRollBackException(Throwable $e) { if ($this->causedByLostConnection($e)) { - $this->transactions = 0; + $this->updateTransactionsCount(0); } throw $e; @@ -289,4 +289,63 @@ public function transactionLevel() { return $this->transactions; } + + /** + * Increment the number of transactions. + * + * @return void + */ + protected function incrementTransactionsCount() + { + $this->transactions++; + + static::$totalTransactions++; + } + + /** + * Decrement the number of transactions. + * + * @return void + */ + protected function decrementTransactionsCount() + { + $this->transactions = max(0, $this->transactions - 1); + + static::$totalTransactions = max(0, static::$totalTransactions - 1); + + if (static::$totalTransactions == 0) { + $this->callAfterTransactionCallbacks(); + } + } + + /** + * Update the number of transactions. + * + * @param integer $count + * @return void + */ + protected function updateTransactionsCount($count) + { + static::$totalTransactions = static::$totalTransactions + $count - $this->transactions; + + $this->transactions = $count; + + if (static::$totalTransactions == 0) { + $this->callAfterTransactionCallbacks(); + } + } + + /** + * Execute the after transaction callbacks. + * + * @return void + */ + protected function callAfterTransactionCallbacks() + { + foreach (static::$afterTransactionCallbacks as $callback) { + call_user_func($callback); + } + + static::$afterTransactionCallbacks= []; + } } diff --git a/src/Illuminate/Database/Connection.php b/src/Illuminate/Database/Connection.php index 98f12639713e..be903f5ac216 100755 --- a/src/Illuminate/Database/Connection.php +++ b/src/Illuminate/Database/Connection.php @@ -112,6 +112,20 @@ class Connection implements ConnectionInterface */ protected $transactions = 0; + /** + * The total number of active transactions. + * + * @var int + */ + public static $totalTransactions = 0; + + /** + * The callbacks to invoke after transactions. + * + * @var callable[] + */ + public static $afterTransactionCallbacks = []; + /** * Indicates if changes have been made to the database. * diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 49c36bdac07f..8f89a9431961 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -90,9 +90,11 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->pheanstalk->useTube($this->getQueue($queue))->put( - $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun - ); + return $this->afterTransactions(function () use ($payload, $queue, $options) { + return $this->pheanstalk->useTube($this->getQueue($queue))->put( + $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun + ); + }); } /** @@ -106,14 +108,16 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue)); - - return $pheanstalk->put( - $this->createPayload($job, $this->getQueue($queue), $data), - Pheanstalk::DEFAULT_PRIORITY, - $this->secondsUntil($delay), - $this->timeToRun - ); + return $this->afterTransactions(function () use ($delay, $job, $data, $queue) { + $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue)); + + return $pheanstalk->put( + $this->createPayload($job, $this->getQueue($queue), $data), + Pheanstalk::DEFAULT_PRIORITY, + $this->secondsUntil($delay), + $this->timeToRun + ); + }); } /** diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index e9a98c663869..132f1f7d092a 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -5,6 +5,7 @@ use Closure; use DateTimeInterface; use Illuminate\Container\Container; +use Illuminate\Database\Connection; use Illuminate\Support\InteractsWithTime; use Illuminate\Support\Str; @@ -255,6 +256,25 @@ protected function withCreatePayloadHooks($queue, array $payload) return $payload; } + /** + * Run the given callback after database transactions. + * + * @param callable $callback + * @return mixed + */ + protected function afterTransactions($callback) + { + if (Connection::$totalTransactions > 0) { + Connection::$afterTransactionCallbacks[] = function () use ($callback) { + $callback(); + }; + + return; + } + + return $callback(); + } + /** * Get the connection name for the queue. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 19fc07589497..c71d74106401 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -5,6 +5,7 @@ use Illuminate\Contracts\Queue\ClearableQueue; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Contracts\Redis\Factory as Redis; +use Illuminate\Database\Connection; use Illuminate\Queue\Jobs\RedisJob; use Illuminate\Support\Str; @@ -121,12 +122,14 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - $this->getConnection()->eval( - LuaScripts::push(), 2, $this->getQueue($queue), - $this->getQueue($queue).':notify', $payload - ); + return $this->afterTransactions(function () use ($payload, $queue, $options) { + $this->getConnection()->eval( + LuaScripts::push(), 2, $this->getQueue($queue), + $this->getQueue($queue).':notify', $payload + ); - return json_decode($payload, true)['id'] ?? null; + return json_decode($payload, true)['id'] ?? null; + }); } /** @@ -153,11 +156,13 @@ public function later($delay, $job, $data = '', $queue = null) */ protected function laterRaw($delay, $payload, $queue = null) { - $this->getConnection()->zadd( - $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload - ); + return $this->afterTransactions(function () use ($delay, $payload, $queue) { + $this->getConnection()->zadd( + $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload + ); - return json_decode($payload, true)['id'] ?? null; + return json_decode($payload, true)['id'] ?? null; + }); } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 8ec5fd110b45..62ec9d218999 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -96,9 +96,11 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->sqs->sendMessage([ - 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, - ])->get('MessageId'); + return $this->afterTransactions(function () use ($payload, $queue, $options) { + return $this->sqs->sendMessage([ + 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, + ])->get('MessageId'); + }); } /** @@ -112,11 +114,13 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->sqs->sendMessage([ - 'QueueUrl' => $this->getQueue($queue), - 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), - 'DelaySeconds' => $this->secondsUntil($delay), - ])->get('MessageId'); + return $this->afterTransactions(function () use ($data, $delay, $job, $queue) { + return $this->sqs->sendMessage([ + 'QueueUrl' => $this->getQueue($queue), + 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), + 'DelaySeconds' => $this->secondsUntil($delay), + ])->get('MessageId'); + }); } /** diff --git a/tests/Database/DatabaseConnectionTest.php b/tests/Database/DatabaseConnectionTest.php index 2be19458ed62..c0699cd3964d 100755 --- a/tests/Database/DatabaseConnectionTest.php +++ b/tests/Database/DatabaseConnectionTest.php @@ -425,6 +425,38 @@ public function testSchemaBuilderCanBeCreated() $this->assertSame($connection, $schema->getConnection()); } + public function testTotalTransactionsIsUpdated() + { + $pdo = $this->createMock(DatabaseConnectionTestMockPDO::class); + $connection = $this->getMockConnection([], $pdo); + + $connection->beginTransaction(); + $this->assertEquals(1, $connection::$totalTransactions); + + $connection->commit(); + $this->assertEquals(0, $connection::$totalTransactions); + } + + public function testAfterTransactionCallbacksAreCalled() + { + $pdo = $this->createMock(DatabaseConnectionTestMockPDO::class); + $connection = $this->getMockConnection([], $pdo); + + $name = 'mohamed'; + + $connection->beginTransaction(); + + Connection::$afterTransactionCallbacks[] = function () use (&$name) { + $name = 'zain'; + }; + + $this->assertEquals('mohamed', $name); + + $connection->commit(); + + $this->assertEquals('zain', $name); + } + protected function getMockConnection($methods = [], $pdo = null) { $pdo = $pdo ?: new DatabaseConnectionTestMockPDO; diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php new file mode 100644 index 000000000000..41f10f9437a2 --- /dev/null +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -0,0 +1,52 @@ +set('app.debug', 'true'); + $app['config']->set('queue.default', 'sqs'); + } + + protected function tearDown(): void + { + QueueConnectionTestJob::$ran = false; + } + + public function testJobWontGetDispatchedInsideATransaction() + { + Connection::$totalTransactions = 1; + + Bus::dispatch(new QueueConnectionTestJob); + Bus::dispatch(new QueueConnectionTestJob); + + $this->assertFalse(QueueConnectionTestJob::$ran); + $this->assertCount(2, Connection::$afterTransactionCallbacks); + } +} + +class QueueConnectionTestJob implements ShouldQueue +{ + use Dispatchable, Queueable; + + public static $ran = false; + + public function handle() + { + static::$ran = true; + } +} diff --git a/tests/Queue/QueueBeanstalkdQueueTest.php b/tests/Queue/QueueBeanstalkdQueueTest.php index 7134917a2369..1d65da6a1050 100755 --- a/tests/Queue/QueueBeanstalkdQueueTest.php +++ b/tests/Queue/QueueBeanstalkdQueueTest.php @@ -3,6 +3,7 @@ namespace Illuminate\Tests\Queue; use Illuminate\Container\Container; +use Illuminate\Database\Connection; use Illuminate\Queue\BeanstalkdQueue; use Illuminate\Queue\Jobs\BeanstalkdJob; use Illuminate\Support\Str; @@ -13,6 +14,13 @@ class QueueBeanstalkdQueueTest extends TestCase { + protected function setUp(): void + { + Connection::$totalTransactions = 0; + + parent::setUp(); + } + protected function tearDown(): void { m::close(); diff --git a/tests/Queue/QueueRedisQueueTest.php b/tests/Queue/QueueRedisQueueTest.php index 2060772d78a8..7dc0ac55b294 100644 --- a/tests/Queue/QueueRedisQueueTest.php +++ b/tests/Queue/QueueRedisQueueTest.php @@ -3,6 +3,7 @@ namespace Illuminate\Tests\Queue; use Illuminate\Contracts\Redis\Factory; +use Illuminate\Database\Connection; use Illuminate\Queue\LuaScripts; use Illuminate\Queue\Queue; use Illuminate\Queue\RedisQueue; @@ -13,6 +14,13 @@ class QueueRedisQueueTest extends TestCase { + protected function setUp(): void + { + Connection::$totalTransactions = 0; + + parent::setUp(); + } + protected function tearDown(): void { m::close(); From 3e58430cb17163275acbbbc9a1bc8d0ce37e6ecb Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 18 Nov 2020 15:17:44 +0200 Subject: [PATCH 02/11] fix tests --- tests/Database/DatabaseConnectionTest.php | 6 ++++++ tests/Integration/Queue/QueueConnectionTest.php | 2 ++ 2 files changed, 8 insertions(+) diff --git a/tests/Database/DatabaseConnectionTest.php b/tests/Database/DatabaseConnectionTest.php index c0699cd3964d..5d452fb69eee 100755 --- a/tests/Database/DatabaseConnectionTest.php +++ b/tests/Database/DatabaseConnectionTest.php @@ -29,6 +29,8 @@ class DatabaseConnectionTest extends TestCase protected function tearDown(): void { m::close(); + + Connection::$afterTransactionCallbacks = []; } public function testSettingDefaultCallsGetDefaultGrammar() @@ -430,6 +432,8 @@ public function testTotalTransactionsIsUpdated() $pdo = $this->createMock(DatabaseConnectionTestMockPDO::class); $connection = $this->getMockConnection([], $pdo); + $connection::$totalTransactions = 0; + $connection->beginTransaction(); $this->assertEquals(1, $connection::$totalTransactions); @@ -442,6 +446,8 @@ public function testAfterTransactionCallbacksAreCalled() $pdo = $this->createMock(DatabaseConnectionTestMockPDO::class); $connection = $this->getMockConnection([], $pdo); + $connection::$totalTransactions = 0; + $name = 'mohamed'; $connection->beginTransaction(); diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index 41f10f9437a2..852d6e65d9a2 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -25,6 +25,8 @@ protected function getEnvironmentSetUp($app) protected function tearDown(): void { QueueConnectionTestJob::$ran = false; + Connection::$totalTransactions; + Connection::$afterTransactionCallbacks = []; } public function testJobWontGetDispatchedInsideATransaction() From a7fb7c53ab6fa36f6b5ecfbbf20bfa3435669a31 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 18 Nov 2020 15:20:15 +0200 Subject: [PATCH 03/11] fix style --- src/Illuminate/Database/Concerns/ManagesTransactions.php | 4 ++-- src/Illuminate/Queue/BeanstalkdQueue.php | 2 +- src/Illuminate/Queue/RedisQueue.php | 2 +- tests/Integration/Queue/QueueConnectionTest.php | 2 -- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Illuminate/Database/Concerns/ManagesTransactions.php b/src/Illuminate/Database/Concerns/ManagesTransactions.php index 7018ebee80fd..995c7e438621 100644 --- a/src/Illuminate/Database/Concerns/ManagesTransactions.php +++ b/src/Illuminate/Database/Concerns/ManagesTransactions.php @@ -321,7 +321,7 @@ protected function decrementTransactionsCount() /** * Update the number of transactions. * - * @param integer $count + * @param int $count * @return void */ protected function updateTransactionsCount($count) @@ -346,6 +346,6 @@ protected function callAfterTransactionCallbacks() call_user_func($callback); } - static::$afterTransactionCallbacks= []; + static::$afterTransactionCallbacks = []; } } diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 8f89a9431961..a00ce6208af4 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -90,7 +90,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue, $options) { + return $this->afterTransactions(function () use ($payload, $queue) { return $this->pheanstalk->useTube($this->getQueue($queue))->put( $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun ); diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index c71d74106401..fbb7efed6be0 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -122,7 +122,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue, $options) { + return $this->afterTransactions(function () use ($payload, $queue) { $this->getConnection()->eval( LuaScripts::push(), 2, $this->getQueue($queue), $this->getQueue($queue).':notify', $payload diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index 852d6e65d9a2..a58814f3a44c 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -2,13 +2,11 @@ namespace Illuminate\Tests\Integration\Queue; -use Illuminate\Bus\Dispatcher; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Database\Connection; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Support\Facades\Bus; -use Illuminate\Support\Facades\DB; use Orchestra\Testbench\TestCase; /** From f5655d92e95b1ac6ffd82e27c8d0c667d64026f5 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Wed, 18 Nov 2020 15:22:04 +0200 Subject: [PATCH 04/11] fix style --- src/Illuminate/Queue/SqsQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 62ec9d218999..670b45572245 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -96,7 +96,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue, $options) { + return $this->afterTransactions(function () use ($payload, $queue) { return $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, ])->get('MessageId'); From 4f5a62d6038dbe95a1bc0ba8fdcb3a443c7f4f83 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Thu, 19 Nov 2020 12:23:52 +0200 Subject: [PATCH 05/11] rename method to enqueueUsing and make it more generic --- src/Illuminate/Queue/BeanstalkdQueue.php | 4 ++-- src/Illuminate/Queue/DatabaseQueue.php | 8 +++++--- src/Illuminate/Queue/Queue.php | 5 +++-- src/Illuminate/Queue/RedisQueue.php | 4 ++-- src/Illuminate/Queue/SqsQueue.php | 4 ++-- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index a00ce6208af4..03c9f44cd72f 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -90,7 +90,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue) { + return $this->enqueueUsing($this, function () use ($payload, $queue) { return $this->pheanstalk->useTube($this->getQueue($queue))->put( $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun ); @@ -108,7 +108,7 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->afterTransactions(function () use ($delay, $job, $data, $queue) { + return $this->enqueueUsing($this, function () use ($delay, $job, $data, $queue) { $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue)); return $pheanstalk->put( diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 89fb91cb3038..ae2de10767aa 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -159,9 +159,11 @@ public function release($queue, $job, $delay) */ protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) { - return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( - $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts - )); + return $this->enqueueUsing($this, function () use ($queue, $payload, $delay, $attempts) { + return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( + $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts + )); + }); } /** diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 132f1f7d092a..8712a58c918b 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -259,12 +259,13 @@ protected function withCreatePayloadHooks($queue, array $payload) /** * Run the given callback after database transactions. * + * @param \Illuminate\Contracts\Queue\Queue $connection * @param callable $callback * @return mixed */ - protected function afterTransactions($callback) + protected function enqueueUsing($connection, $callback) { - if (Connection::$totalTransactions > 0) { + if (! $connection instanceof DatabaseQueue && Connection::$totalTransactions > 0) { Connection::$afterTransactionCallbacks[] = function () use ($callback) { $callback(); }; diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index fbb7efed6be0..45861748184c 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -122,7 +122,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue) { + return $this->enqueueUsing($this, function () use ($payload, $queue) { $this->getConnection()->eval( LuaScripts::push(), 2, $this->getQueue($queue), $this->getQueue($queue).':notify', $payload @@ -156,7 +156,7 @@ public function later($delay, $job, $data = '', $queue = null) */ protected function laterRaw($delay, $payload, $queue = null) { - return $this->afterTransactions(function () use ($delay, $payload, $queue) { + return $this->enqueueUsing($this, function () use ($delay, $payload, $queue) { $this->getConnection()->zadd( $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload ); diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 670b45572245..ae9c673842b1 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -96,7 +96,7 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->afterTransactions(function () use ($payload, $queue) { + return $this->enqueueUsing($this, function () use ($payload, $queue) { return $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, ])->get('MessageId'); @@ -114,7 +114,7 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->afterTransactions(function () use ($data, $delay, $job, $queue) { + return $this->enqueueUsing($this, function () use ($data, $delay, $job, $queue) { return $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), From 5fb1350cf2acee2d4ada768da96ec7d0a3ea6664 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Thu, 19 Nov 2020 12:38:57 +0200 Subject: [PATCH 06/11] update docblock --- src/Illuminate/Queue/Queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 8712a58c918b..88de9ddfd137 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -257,7 +257,7 @@ protected function withCreatePayloadHooks($queue, array $payload) } /** - * Run the given callback after database transactions. + * Enqueue a jobs using the given callback. * * @param \Illuminate\Contracts\Queue\Queue $connection * @param callable $callback From 8ed116c04c571638d06607c6edef9c810cc7da16 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 23 Nov 2020 15:08:53 +0200 Subject: [PATCH 07/11] add a configuration value --- src/Illuminate/Queue/BeanstalkdQueue.php | 14 +++++++++++++- .../Queue/Connectors/BeanstalkdConnector.php | 3 ++- .../Queue/Connectors/DatabaseConnector.php | 3 ++- .../Queue/Connectors/RedisConnector.php | 3 ++- src/Illuminate/Queue/Connectors/SqsConnector.php | 6 +++++- src/Illuminate/Queue/DatabaseQueue.php | 14 +++++++++++++- src/Illuminate/Queue/Queue.php | 2 +- src/Illuminate/Queue/RedisQueue.php | 15 ++++++++++++++- src/Illuminate/Queue/SqsQueue.php | 14 +++++++++++++- tests/Integration/Queue/QueueConnectionTest.php | 1 + 10 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 03c9f44cd72f..804dac8c6c81 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -37,6 +37,13 @@ class BeanstalkdQueue extends Queue implements QueueContract */ protected $blockFor; + /** + * Indicate that pushes should be delayed till after database transactions commit. + * + * @var bool + */ + protected $pushAfterCommits = false; + /** * Create a new Beanstalkd queue instance. * @@ -46,12 +53,17 @@ class BeanstalkdQueue extends Queue implements QueueContract * @param int $blockFor * @return void */ - public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0) + public function __construct(Pheanstalk $pheanstalk, + $default, + $timeToRun, + $blockFor = 0, + $pushAfterCommits = false) { $this->default = $default; $this->blockFor = $blockFor; $this->timeToRun = $timeToRun; $this->pheanstalk = $pheanstalk; + $this->pushAfterCommits = $pushAfterCommits; } /** diff --git a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php index b54d80193b69..f38e5b67a519 100755 --- a/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php +++ b/src/Illuminate/Queue/Connectors/BeanstalkdConnector.php @@ -20,7 +20,8 @@ public function connect(array $config) $this->pheanstalk($config), $config['queue'], $config['retry_after'] ?? Pheanstalk::DEFAULT_TTR, - $config['block_for'] ?? 0 + $config['block_for'] ?? 0, + $config['after_commits'] ?? false ); } diff --git a/src/Illuminate/Queue/Connectors/DatabaseConnector.php b/src/Illuminate/Queue/Connectors/DatabaseConnector.php index 893a898f6b66..71cbdcadf1c1 100644 --- a/src/Illuminate/Queue/Connectors/DatabaseConnector.php +++ b/src/Illuminate/Queue/Connectors/DatabaseConnector.php @@ -37,7 +37,8 @@ public function connect(array $config) $this->connections->connection($config['connection'] ?? null), $config['table'], $config['queue'], - $config['retry_after'] ?? 60 + $config['retry_after'] ?? 60, + $config['after_commits'] ?? false ); } } diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index 1efe5f65e903..a1b6e32ec6d8 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -46,7 +46,8 @@ public function connect(array $config) $this->redis, $config['queue'], $config['connection'] ?? $this->connection, $config['retry_after'] ?? 60, - $config['block_for'] ?? null + $config['block_for'] ?? null, + $config['after_commits'] ?? false ); } } diff --git a/src/Illuminate/Queue/Connectors/SqsConnector.php b/src/Illuminate/Queue/Connectors/SqsConnector.php index 07d7f8232674..ca392bf360dc 100755 --- a/src/Illuminate/Queue/Connectors/SqsConnector.php +++ b/src/Illuminate/Queue/Connectors/SqsConnector.php @@ -23,7 +23,11 @@ public function connect(array $config) } return new SqsQueue( - new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? '' + new SqsClient($config), + $config['queue'], + $config['prefix'] ?? '', + $config['suffix'] ?? '', + $config['after_commits'] ?? false ); } diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index ae2de10767aa..f19c4b962f45 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -40,6 +40,13 @@ class DatabaseQueue extends Queue implements QueueContract, ClearableQueue */ protected $retryAfter = 60; + /** + * Indicate that pushes should be delayed till after database transactions commit. + * + * @var bool + */ + protected $pushAfterCommits = false; + /** * Create a new database queue instance. * @@ -49,12 +56,17 @@ class DatabaseQueue extends Queue implements QueueContract, ClearableQueue * @param int $retryAfter * @return void */ - public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60) + public function __construct(Connection $database, + $table, + $default = 'default', + $retryAfter = 60, + $pushAfterCommits = false) { $this->table = $table; $this->default = $default; $this->database = $database; $this->retryAfter = $retryAfter; + $this->pushAfterCommits = $pushAfterCommits; } /** diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 88de9ddfd137..016601675acc 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -265,7 +265,7 @@ protected function withCreatePayloadHooks($queue, array $payload) */ protected function enqueueUsing($connection, $callback) { - if (! $connection instanceof DatabaseQueue && Connection::$totalTransactions > 0) { + if ($connection->pushAfterCommits ?? false && Connection::$totalTransactions > 0) { Connection::$afterTransactionCallbacks[] = function () use ($callback) { $callback(); }; diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 45861748184c..193da7190a91 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -46,6 +46,13 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue */ protected $blockFor = null; + /** + * Indicate that pushes should be delayed till after database transactions commit. + * + * @var bool + */ + protected $pushAfterCommits = false; + /** * Create a new Redis queue instance. * @@ -56,13 +63,19 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param int|null $blockFor * @return void */ - public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = null) + public function __construct(Redis $redis, + $default = 'default', + $connection = null, + $retryAfter = 60, + $blockFor = null, + $pushAfterCommits = false) { $this->redis = $redis; $this->default = $default; $this->blockFor = $blockFor; $this->connection = $connection; $this->retryAfter = $retryAfter; + $this->pushAfterCommits = $pushAfterCommits; } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index ae9c673842b1..97d6d2353e42 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -38,6 +38,13 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue */ private $suffix; + /** + * Indicate that pushes should be delayed till after database transactions commit. + * + * @var bool + */ + protected $pushAfterCommits = false; + /** * Create a new Amazon SQS queue instance. * @@ -47,12 +54,17 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * @param string $suffix * @return void */ - public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '') + public function __construct(SqsClient $sqs, + $default, + $prefix = '', + $suffix = '', + $pushAfterCommits = false) { $this->sqs = $sqs; $this->prefix = $prefix; $this->default = $default; $this->suffix = $suffix; + $this->pushAfterCommits = $pushAfterCommits; } /** diff --git a/tests/Integration/Queue/QueueConnectionTest.php b/tests/Integration/Queue/QueueConnectionTest.php index a58814f3a44c..83a6c38527e3 100644 --- a/tests/Integration/Queue/QueueConnectionTest.php +++ b/tests/Integration/Queue/QueueConnectionTest.php @@ -18,6 +18,7 @@ protected function getEnvironmentSetUp($app) { $app['config']->set('app.debug', 'true'); $app['config']->set('queue.default', 'sqs'); + $app['config']->set('queue.connections.sqs.after_commits', true); } protected function tearDown(): void From 4f7193e30569eea65fc22b1ed2e272aff49cac9c Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Mon, 23 Nov 2020 16:33:38 +0200 Subject: [PATCH 08/11] call enqueueUsing on push and later --- src/Illuminate/Queue/BeanstalkdQueue.php | 14 +++++----- src/Illuminate/Queue/DatabaseQueue.php | 24 +++++++++-------- src/Illuminate/Queue/RedisQueue.php | 34 +++++++++++++----------- src/Illuminate/Queue/SqsQueue.php | 14 +++++----- 4 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 804dac8c6c81..869c6dbe346c 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -89,7 +89,11 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + $payload = $this->createPayload($job, $this->getQueue($queue), $data); + + return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->pushRaw($payload, $queue); + }); } /** @@ -102,11 +106,9 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->enqueueUsing($this, function () use ($payload, $queue) { - return $this->pheanstalk->useTube($this->getQueue($queue))->put( - $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun - ); - }); + return $this->pheanstalk->useTube($this->getQueue($queue))->put( + $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun + ); } /** diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index f19c4b962f45..6480d09e4ecd 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -92,9 +92,11 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushToDatabase($queue, $this->createPayload( - $job, $this->getQueue($queue), $data - )); + $payload = $this->createPayload($job, $this->getQueue($queue), $data); + + return $this->enqueueUsing($this, function () use ($queue, $payload) { + return $this->pushToDatabase($queue, $payload); + }); } /** @@ -121,9 +123,11 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->pushToDatabase($queue, $this->createPayload( - $job, $this->getQueue($queue), $data - ), $delay); + $payload = $this->createPayload($job, $this->getQueue($queue), $data); + + return $this->enqueueUsing($this, function () use ($queue, $delay, $payload) { + return $this->pushToDatabase($queue, $payload, $delay); + }); } /** @@ -171,11 +175,9 @@ public function release($queue, $job, $delay) */ protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0) { - return $this->enqueueUsing($this, function () use ($queue, $payload, $delay, $attempts) { - return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( - $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts - )); - }); + return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord( + $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts + )); } /** diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 193da7190a91..3096de29182b 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -122,7 +122,11 @@ public function bulk($jobs, $data = '', $queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); + $payload = $this->createPayload($job, $this->getQueue($queue), $data); + + return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->pushRaw($payload, $queue); + }); } /** @@ -135,14 +139,12 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->enqueueUsing($this, function () use ($payload, $queue) { - $this->getConnection()->eval( - LuaScripts::push(), 2, $this->getQueue($queue), - $this->getQueue($queue).':notify', $payload - ); + $this->getConnection()->eval( + LuaScripts::push(), 2, $this->getQueue($queue), + $this->getQueue($queue).':notify', $payload + ); - return json_decode($payload, true)['id'] ?? null; - }); + return json_decode($payload, true)['id'] ?? null; } /** @@ -156,7 +158,11 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->laterRaw($delay, $this->createPayload($job, $this->getQueue($queue), $data), $queue); + $payload = $this->createPayload($job, $this->getQueue($queue), $data); + + return $this->enqueueUsing($this, function () use ($payload, $delay, $queue) { + return $this->laterRaw($delay, $payload, $queue); + }); } /** @@ -169,13 +175,11 @@ public function later($delay, $job, $data = '', $queue = null) */ protected function laterRaw($delay, $payload, $queue = null) { - return $this->enqueueUsing($this, function () use ($delay, $payload, $queue) { - $this->getConnection()->zadd( - $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload - ); + $this->getConnection()->zadd( + $this->getQueue($queue).':delayed', $this->availableAt($delay), $payload + ); - return json_decode($payload, true)['id'] ?? null; - }); + return json_decode($payload, true)['id'] ?? null; } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 97d6d2353e42..00acb96c75f4 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -95,7 +95,11 @@ public function size($queue = null) */ public function push($job, $data = '', $queue = null) { - return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue); + $payload = $this->createPayload($job, $queue ?: $this->default, $data); + + return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->pushRaw($payload, $queue); + }); } /** @@ -108,11 +112,9 @@ public function push($job, $data = '', $queue = null) */ public function pushRaw($payload, $queue = null, array $options = []) { - return $this->enqueueUsing($this, function () use ($payload, $queue) { - return $this->sqs->sendMessage([ - 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, - ])->get('MessageId'); - }); + return $this->sqs->sendMessage([ + 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, + ])->get('MessageId'); } /** From 9346d5b2610f557c08fd8e5f1dc9aa502e0d912e Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Tue, 24 Nov 2020 14:55:57 +0200 Subject: [PATCH 09/11] allow setting dispatching within transactions behaviour on job level --- src/Illuminate/Queue/BeanstalkdQueue.php | 10 ++++----- src/Illuminate/Queue/DatabaseQueue.php | 10 ++++----- src/Illuminate/Queue/Queue.php | 26 ++++++++++++++++++++++-- src/Illuminate/Queue/RedisQueue.php | 10 ++++----- src/Illuminate/Queue/SqsQueue.php | 10 ++++----- 5 files changed, 44 insertions(+), 22 deletions(-) diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 869c6dbe346c..2a6f9e716736 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -42,7 +42,7 @@ class BeanstalkdQueue extends Queue implements QueueContract * * @var bool */ - protected $pushAfterCommits = false; + protected $dispatchAfterTransaction = false; /** * Create a new Beanstalkd queue instance. @@ -57,13 +57,13 @@ public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0, - $pushAfterCommits = false) + $dispatchAfterTransaction = false) { $this->default = $default; $this->blockFor = $blockFor; $this->timeToRun = $timeToRun; $this->pheanstalk = $pheanstalk; - $this->pushAfterCommits = $pushAfterCommits; + $this->dispatchAfterTransaction = $dispatchAfterTransaction; } /** @@ -91,7 +91,7 @@ public function push($job, $data = '', $queue = null) { $payload = $this->createPayload($job, $this->getQueue($queue), $data); - return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->enqueueUsing($this, $job, function () use ($payload, $queue) { return $this->pushRaw($payload, $queue); }); } @@ -122,7 +122,7 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->enqueueUsing($this, function () use ($delay, $job, $data, $queue) { + return $this->enqueueUsing($this, $job, function () use ($delay, $job, $data, $queue) { $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue)); return $pheanstalk->put( diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 6480d09e4ecd..815c849cb29f 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -45,7 +45,7 @@ class DatabaseQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $pushAfterCommits = false; + protected $dispatchAfterTransaction = false; /** * Create a new database queue instance. @@ -60,13 +60,13 @@ public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60, - $pushAfterCommits = false) + $dispatchAfterTransaction = false) { $this->table = $table; $this->default = $default; $this->database = $database; $this->retryAfter = $retryAfter; - $this->pushAfterCommits = $pushAfterCommits; + $this->dispatchAfterTransaction = $dispatchAfterTransaction; } /** @@ -94,7 +94,7 @@ public function push($job, $data = '', $queue = null) { $payload = $this->createPayload($job, $this->getQueue($queue), $data); - return $this->enqueueUsing($this, function () use ($queue, $payload) { + return $this->enqueueUsing($this, $job, function () use ($queue, $payload) { return $this->pushToDatabase($queue, $payload); }); } @@ -125,7 +125,7 @@ public function later($delay, $job, $data = '', $queue = null) { $payload = $this->createPayload($job, $this->getQueue($queue), $data); - return $this->enqueueUsing($this, function () use ($queue, $delay, $payload) { + return $this->enqueueUsing($this, $job, function () use ($queue, $delay, $payload) { return $this->pushToDatabase($queue, $payload, $delay); }); } diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 016601675acc..fd6dc45c9b80 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -260,12 +260,14 @@ protected function withCreatePayloadHooks($queue, array $payload) * Enqueue a jobs using the given callback. * * @param \Illuminate\Contracts\Queue\Queue $connection + * @param \Closure|string|object $job * @param callable $callback * @return mixed */ - protected function enqueueUsing($connection, $callback) + protected function enqueueUsing($connection, $job, $callback) { - if ($connection->pushAfterCommits ?? false && Connection::$totalTransactions > 0) { + if (Connection::$totalTransactions > 0 && + $this->shouldDispatchAfterTransactions($connection, $job)) { Connection::$afterTransactionCallbacks[] = function () use ($callback) { $callback(); }; @@ -309,4 +311,24 @@ public function setContainer(Container $container) { $this->container = $container; } + + /** + * Determine if the job should be dispatched after database transactions. + * + * @param \Illuminate\Contracts\Queue\Queue $connection + * @param \Closure|string|object $job + * @return bool + */ + protected function shouldDispatchAfterTransactions($connection, $job) + { + if (is_object($job) && isset($job->dispatchAfterTransaction)) { + return $job->dispatchAfterTransaction; + } + + if (isset($connection->dispatchAfterTransaction)) { + return $connection->dispatchAfterTransaction; + } + + return false; + } } diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index 3096de29182b..fa6cb0861738 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -51,7 +51,7 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $pushAfterCommits = false; + protected $dispatchAfterTransaction = false; /** * Create a new Redis queue instance. @@ -68,14 +68,14 @@ public function __construct(Redis $redis, $connection = null, $retryAfter = 60, $blockFor = null, - $pushAfterCommits = false) + $dispatchAfterTransaction = false) { $this->redis = $redis; $this->default = $default; $this->blockFor = $blockFor; $this->connection = $connection; $this->retryAfter = $retryAfter; - $this->pushAfterCommits = $pushAfterCommits; + $this->dispatchAfterTransaction = $dispatchAfterTransaction; } /** @@ -124,7 +124,7 @@ public function push($job, $data = '', $queue = null) { $payload = $this->createPayload($job, $this->getQueue($queue), $data); - return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->enqueueUsing($this, $job, function () use ($payload, $queue) { return $this->pushRaw($payload, $queue); }); } @@ -160,7 +160,7 @@ public function later($delay, $job, $data = '', $queue = null) { $payload = $this->createPayload($job, $this->getQueue($queue), $data); - return $this->enqueueUsing($this, function () use ($payload, $delay, $queue) { + return $this->enqueueUsing($this, $job, function () use ($payload, $delay, $queue) { return $this->laterRaw($delay, $payload, $queue); }); } diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 00acb96c75f4..5736c50c58be 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -43,7 +43,7 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $pushAfterCommits = false; + protected $dispatchAfterTransaction = false; /** * Create a new Amazon SQS queue instance. @@ -58,13 +58,13 @@ public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', - $pushAfterCommits = false) + $dispatchAfterTransaction = false) { $this->sqs = $sqs; $this->prefix = $prefix; $this->default = $default; $this->suffix = $suffix; - $this->pushAfterCommits = $pushAfterCommits; + $this->dispatchAfterTransaction = $dispatchAfterTransaction; } /** @@ -97,7 +97,7 @@ public function push($job, $data = '', $queue = null) { $payload = $this->createPayload($job, $queue ?: $this->default, $data); - return $this->enqueueUsing($this, function () use ($payload, $queue) { + return $this->enqueueUsing($this, $job, function () use ($payload, $queue) { return $this->pushRaw($payload, $queue); }); } @@ -128,7 +128,7 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - return $this->enqueueUsing($this, function () use ($data, $delay, $job, $queue) { + return $this->enqueueUsing($this, $job, function () use ($data, $delay, $job, $queue) { return $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data), From eba9e510a5ea2939a47e63ed219d5ef94bc7e4e8 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Tue, 24 Nov 2020 15:58:03 +0200 Subject: [PATCH 10/11] add afterTransactions helpers --- src/Illuminate/Bus/Queueable.php | 20 +++++++++++++++++++ .../Foundation/Bus/PendingDispatch.php | 13 ++++++++++++ src/Illuminate/Queue/BeanstalkdQueue.php | 7 ++++--- src/Illuminate/Queue/DatabaseQueue.php | 7 ++++--- src/Illuminate/Queue/Queue.php | 8 ++++---- src/Illuminate/Queue/RedisQueue.php | 7 ++++--- src/Illuminate/Queue/SqsQueue.php | 7 ++++--- 7 files changed, 53 insertions(+), 16 deletions(-) diff --git a/src/Illuminate/Bus/Queueable.php b/src/Illuminate/Bus/Queueable.php index c2520b98c040..0591edf519cc 100644 --- a/src/Illuminate/Bus/Queueable.php +++ b/src/Illuminate/Bus/Queueable.php @@ -52,6 +52,13 @@ trait Queueable */ public $delay; + /** + * Indicate the job should be dispatched after database transactions. + * + * @var bool|null + */ + public $dispatchAfterTransactions; + /** * The middleware the job should be dispatched through. * @@ -133,6 +140,19 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @param bool|null $value + * @return $this + */ + public function afterTransactions($value = true) + { + $this->dispatchAfterTransactions = $value; + + return $this; + } + /** * Specify the middleware the job should be dispatched through. * diff --git a/src/Illuminate/Foundation/Bus/PendingDispatch.php b/src/Illuminate/Foundation/Bus/PendingDispatch.php index 9495dc15c114..55afb84ec1dd 100644 --- a/src/Illuminate/Foundation/Bus/PendingDispatch.php +++ b/src/Illuminate/Foundation/Bus/PendingDispatch.php @@ -99,6 +99,19 @@ public function delay($delay) return $this; } + /** + * Indicate that the job should be dispatched after database transactions. + * + * @param bool|null $value + * @return $this + */ + public function afterTransactions($value = true) + { + $this->job->afterTransactions($value); + + return $this; + } + /** * Set the jobs that should run if this job is successful. * diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 2a6f9e716736..30e773b51645 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -42,7 +42,7 @@ class BeanstalkdQueue extends Queue implements QueueContract * * @var bool */ - protected $dispatchAfterTransaction = false; + protected $dispatchAfterTransactions = false; /** * Create a new Beanstalkd queue instance. @@ -51,19 +51,20 @@ class BeanstalkdQueue extends Queue implements QueueContract * @param string $default * @param int $timeToRun * @param int $blockFor + * @param bool $dispatchAfterTransactions * @return void */ public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0, - $dispatchAfterTransaction = false) + $dispatchAfterTransactions = false) { $this->default = $default; $this->blockFor = $blockFor; $this->timeToRun = $timeToRun; $this->pheanstalk = $pheanstalk; - $this->dispatchAfterTransaction = $dispatchAfterTransaction; + $this->dispatchAfterTransactions = $dispatchAfterTransactions; } /** diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index 815c849cb29f..af08302ecd98 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -45,7 +45,7 @@ class DatabaseQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $dispatchAfterTransaction = false; + protected $dispatchAfterTransactions = false; /** * Create a new database queue instance. @@ -54,19 +54,20 @@ class DatabaseQueue extends Queue implements QueueContract, ClearableQueue * @param string $table * @param string $default * @param int $retryAfter + * @param bool $dispatchAfterTransactions * @return void */ public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60, - $dispatchAfterTransaction = false) + $dispatchAfterTransactions = false) { $this->table = $table; $this->default = $default; $this->database = $database; $this->retryAfter = $retryAfter; - $this->dispatchAfterTransaction = $dispatchAfterTransaction; + $this->dispatchAfterTransactions = $dispatchAfterTransactions; } /** diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index fd6dc45c9b80..83faa2dc8b60 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -321,12 +321,12 @@ public function setContainer(Container $container) */ protected function shouldDispatchAfterTransactions($connection, $job) { - if (is_object($job) && isset($job->dispatchAfterTransaction)) { - return $job->dispatchAfterTransaction; + if (is_object($job) && isset($job->dispatchAfterTransactions)) { + return $job->dispatchAfterTransactions; } - if (isset($connection->dispatchAfterTransaction)) { - return $connection->dispatchAfterTransaction; + if (isset($connection->dispatchAfterTransactions)) { + return $connection->dispatchAfterTransactions; } return false; diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index fa6cb0861738..1260f7b5a43b 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -51,7 +51,7 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $dispatchAfterTransaction = false; + protected $dispatchAfterTransactions = false; /** * Create a new Redis queue instance. @@ -61,6 +61,7 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue * @param string|null $connection * @param int $retryAfter * @param int|null $blockFor + * @param bool $dispatchAfterTransactions * @return void */ public function __construct(Redis $redis, @@ -68,14 +69,14 @@ public function __construct(Redis $redis, $connection = null, $retryAfter = 60, $blockFor = null, - $dispatchAfterTransaction = false) + $dispatchAfterTransactions = false) { $this->redis = $redis; $this->default = $default; $this->blockFor = $blockFor; $this->connection = $connection; $this->retryAfter = $retryAfter; - $this->dispatchAfterTransaction = $dispatchAfterTransaction; + $this->dispatchAfterTransactions = $dispatchAfterTransactions; } /** diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index 5736c50c58be..3ad59fc7409b 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -43,7 +43,7 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * * @var bool */ - protected $dispatchAfterTransaction = false; + protected $dispatchAfterTransactions = false; /** * Create a new Amazon SQS queue instance. @@ -52,19 +52,20 @@ class SqsQueue extends Queue implements QueueContract, ClearableQueue * @param string $default * @param string $prefix * @param string $suffix + * @param bool $dispatchAfterTransactions * @return void */ public function __construct(SqsClient $sqs, $default, $prefix = '', $suffix = '', - $dispatchAfterTransaction = false) + $dispatchAfterTransactions = false) { $this->sqs = $sqs; $this->prefix = $prefix; $this->default = $default; $this->suffix = $suffix; - $this->dispatchAfterTransaction = $dispatchAfterTransaction; + $this->dispatchAfterTransactions = $dispatchAfterTransactions; } /** From 95a9f484b9b2e3f0fcb85accd1620ef305d4ef63 Mon Sep 17 00:00:00 2001 From: Mohamed Said Date: Tue, 24 Nov 2020 16:24:41 +0200 Subject: [PATCH 11/11] support dispatchAfterTransactions in events, mail, notifications, listeners, and broadcasted events --- src/Illuminate/Broadcasting/BroadcastEvent.php | 1 + src/Illuminate/Events/Dispatcher.php | 1 + src/Illuminate/Mail/SendQueuedMailable.php | 1 + src/Illuminate/Notifications/NotificationSender.php | 1 + 4 files changed, 4 insertions(+) diff --git a/src/Illuminate/Broadcasting/BroadcastEvent.php b/src/Illuminate/Broadcasting/BroadcastEvent.php index 775df78059d7..da8c374a234c 100644 --- a/src/Illuminate/Broadcasting/BroadcastEvent.php +++ b/src/Illuminate/Broadcasting/BroadcastEvent.php @@ -46,6 +46,7 @@ public function __construct($event) $this->event = $event; $this->tries = property_exists($event, 'tries') ? $event->tries : null; $this->timeout = property_exists($event, 'timeout') ? $event->timeout : null; + $this->dispatchAfterTransactions = property_exists($event, 'dispatchAfterTransactions') ? $event->dispatchAfterTransactions : null; } /** diff --git a/src/Illuminate/Events/Dispatcher.php b/src/Illuminate/Events/Dispatcher.php index 91214b457188..18091f9a35e9 100755 --- a/src/Illuminate/Events/Dispatcher.php +++ b/src/Illuminate/Events/Dispatcher.php @@ -559,6 +559,7 @@ protected function propagateListenerOptions($listener, $job) $job->timeout = $listener->timeout ?? null; $job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil() : null; + $job->dispatchAfterTransactions = $listener->dispatchAfterTransactions ?? null; }); } diff --git a/src/Illuminate/Mail/SendQueuedMailable.php b/src/Illuminate/Mail/SendQueuedMailable.php index 76822bcd05f5..8f579a78e3ee 100644 --- a/src/Illuminate/Mail/SendQueuedMailable.php +++ b/src/Illuminate/Mail/SendQueuedMailable.php @@ -39,6 +39,7 @@ public function __construct(MailableContract $mailable) $this->mailable = $mailable; $this->tries = property_exists($mailable, 'tries') ? $mailable->tries : null; $this->timeout = property_exists($mailable, 'timeout') ? $mailable->timeout : null; + $this->dispatchAfterTransactions = property_exists($mailable, 'dispatchAfterTransactions') ? $mailable->dispatchAfterTransactions : null; } /** diff --git a/src/Illuminate/Notifications/NotificationSender.php b/src/Illuminate/Notifications/NotificationSender.php index 1d6c424b1512..1abd61eb8eb4 100644 --- a/src/Illuminate/Notifications/NotificationSender.php +++ b/src/Illuminate/Notifications/NotificationSender.php @@ -203,6 +203,7 @@ protected function queueNotification($notifiables, $notification) ->onConnection($notification->connection) ->onQueue($queue) ->delay($notification->delay) + ->afterTransactions($notification->dispatchAfterTransactions) ->through( array_merge( method_exists($notification, 'middleware') ? $notification->middleware() : [],