Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Delay pushing jobs to queue until database transactions are committed #35266

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions src/Illuminate/Broadcasting/BroadcastEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public function __construct($event)
$this->event = $event;
$this->tries = property_exists($event, 'tries') ? $event->tries : null;
$this->timeout = property_exists($event, 'timeout') ? $event->timeout : null;
$this->dispatchAfterTransactions = property_exists($event, 'dispatchAfterTransactions') ? $event->dispatchAfterTransactions : null;
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/Illuminate/Bus/Queueable.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ trait Queueable
*/
public $delay;

/**
* Indicate the job should be dispatched after database transactions.
*
* @var bool|null
*/
public $dispatchAfterTransactions;

/**
* The middleware the job should be dispatched through.
*
Expand Down Expand Up @@ -133,6 +140,19 @@ public function delay($delay)
return $this;
}

/**
* Indicate that the job should be dispatched after database transactions.
*
* @param bool|null $value
* @return $this
*/
public function afterTransactions($value = true)
{
$this->dispatchAfterTransactions = $value;

return $this;
}

/**
* Specify the middleware the job should be dispatched through.
*
Expand Down
75 changes: 67 additions & 8 deletions src/Illuminate/Database/Concerns/ManagesTransactions.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public function transaction(Closure $callback, $attempts = 1)
$this->getPdo()->commit();
}

$this->transactions = max(0, $this->transactions - 1);
$this->decrementTransactionsCount();
} catch (Throwable $e) {
$this->handleCommitTransactionException(
$e, $currentAttempt, $attempts
Expand Down Expand Up @@ -76,7 +76,7 @@ protected function handleTransactionException(Throwable $e, $currentAttempt, $ma
// let the developer handle it in another way. We will decrement too.
if ($this->causedByConcurrencyError($e) &&
$this->transactions > 1) {
$this->transactions--;
$this->decrementTransactionsCount();

throw $e;
}
Expand Down Expand Up @@ -105,7 +105,7 @@ public function beginTransaction()
{
$this->createTransaction();

$this->transactions++;
$this->incrementTransactionsCount();

$this->fireConnectionEvent('beganTransaction');
}
Expand Down Expand Up @@ -178,7 +178,7 @@ public function commit()
$this->getPdo()->commit();
}

$this->transactions = max(0, $this->transactions - 1);
$this->decrementTransactionsCount();

$this->fireConnectionEvent('committed');
}
Expand All @@ -195,15 +195,15 @@ public function commit()
*/
protected function handleCommitTransactionException(Throwable $e, $currentAttempt, $maxAttempts)
{
$this->transactions = max(0, $this->transactions - 1);
$this->decrementTransactionsCount();

if ($this->causedByConcurrencyError($e) &&
$currentAttempt < $maxAttempts) {
return;
}

if ($this->causedByLostConnection($e)) {
$this->transactions = 0;
$this->updateTransactionsCount(0);
}

throw $e;
Expand Down Expand Up @@ -239,7 +239,7 @@ public function rollBack($toLevel = null)
$this->handleRollBackException($e);
}

$this->transactions = $toLevel;
$this->updateTransactionsCount($toLevel);

$this->fireConnectionEvent('rollingBack');
}
Expand Down Expand Up @@ -274,7 +274,7 @@ protected function performRollBack($toLevel)
protected function handleRollBackException(Throwable $e)
{
if ($this->causedByLostConnection($e)) {
$this->transactions = 0;
$this->updateTransactionsCount(0);
}

throw $e;
Expand All @@ -289,4 +289,63 @@ public function transactionLevel()
{
return $this->transactions;
}

/**
* Increment the number of transactions.
*
* @return void
*/
protected function incrementTransactionsCount()
{
$this->transactions++;

static::$totalTransactions++;
}

/**
* Decrement the number of transactions.
*
* @return void
*/
protected function decrementTransactionsCount()
{
$this->transactions = max(0, $this->transactions - 1);

static::$totalTransactions = max(0, static::$totalTransactions - 1);

if (static::$totalTransactions == 0) {
$this->callAfterTransactionCallbacks();
}
}

/**
* Update the number of transactions.
*
* @param int $count
* @return void
*/
protected function updateTransactionsCount($count)
{
static::$totalTransactions = static::$totalTransactions + $count - $this->transactions;

$this->transactions = $count;

if (static::$totalTransactions == 0) {
$this->callAfterTransactionCallbacks();
}
}

/**
* Execute the after transaction callbacks.
*
* @return void
*/
protected function callAfterTransactionCallbacks()
{
foreach (static::$afterTransactionCallbacks as $callback) {
call_user_func($callback);
}

static::$afterTransactionCallbacks = [];
}
}
14 changes: 14 additions & 0 deletions src/Illuminate/Database/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ class Connection implements ConnectionInterface
*/
protected $transactions = 0;

/**
* The total number of active transactions.
*
* @var int
*/
public static $totalTransactions = 0;

/**
* The callbacks to invoke after transactions.
*
* @var callable[]
*/
public static $afterTransactionCallbacks = [];

/**
* Indicates if changes have been made to the database.
*
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Events/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ protected function propagateListenerOptions($listener, $job)
$job->timeout = $listener->timeout ?? null;
$job->retryUntil = method_exists($listener, 'retryUntil')
? $listener->retryUntil() : null;
$job->dispatchAfterTransactions = $listener->dispatchAfterTransactions ?? null;
});
}

Expand Down
13 changes: 13 additions & 0 deletions src/Illuminate/Foundation/Bus/PendingDispatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ public function delay($delay)
return $this;
}

/**
* Indicate that the job should be dispatched after database transactions.
*
* @param bool|null $value
* @return $this
*/
public function afterTransactions($value = true)
{
$this->job->afterTransactions($value);

return $this;
}

/**
* Set the jobs that should run if this job is successful.
*
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Mail/SendQueuedMailable.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public function __construct(MailableContract $mailable)
$this->mailable = $mailable;
$this->tries = property_exists($mailable, 'tries') ? $mailable->tries : null;
$this->timeout = property_exists($mailable, 'timeout') ? $mailable->timeout : null;
$this->dispatchAfterTransactions = property_exists($mailable, 'dispatchAfterTransactions') ? $mailable->dispatchAfterTransactions : null;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Illuminate/Notifications/NotificationSender.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ protected function queueNotification($notifiables, $notification)
($notification->delay[$channel] ?? null)
: $notification->delay
)
->afterTransactions($notification->dispatchAfterTransactions)
->through(
array_merge(
method_exists($notification, 'middleware') ? $notification->middleware() : [],
Expand Down
39 changes: 29 additions & 10 deletions src/Illuminate/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,34 @@ class BeanstalkdQueue extends Queue implements QueueContract
*/
protected $blockFor;

/**
* Indicate that pushes should be delayed till after database transactions commit.
*
* @var bool
*/
protected $dispatchAfterTransactions = false;

/**
* Create a new Beanstalkd queue instance.
*
* @param \Pheanstalk\Pheanstalk $pheanstalk
* @param string $default
* @param int $timeToRun
* @param int $blockFor
* @param bool $dispatchAfterTransactions
* @return void
*/
public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun, $blockFor = 0)
public function __construct(Pheanstalk $pheanstalk,
$default,
$timeToRun,
$blockFor = 0,
$dispatchAfterTransactions = false)
{
$this->default = $default;
$this->blockFor = $blockFor;
$this->timeToRun = $timeToRun;
$this->pheanstalk = $pheanstalk;
$this->dispatchAfterTransactions = $dispatchAfterTransactions;
}

/**
Expand All @@ -77,7 +90,11 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
$payload = $this->createPayload($job, $this->getQueue($queue), $data);

return $this->enqueueUsing($this, $job, function () use ($payload, $queue) {
return $this->pushRaw($payload, $queue);
});
}

/**
Expand Down Expand Up @@ -106,14 +123,16 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));

return $pheanstalk->put(
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
return $this->enqueueUsing($this, $job, function () use ($delay, $job, $data, $queue) {
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));

return $pheanstalk->put(
$this->createPayload($job, $this->getQueue($queue), $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
});
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/BeanstalkdConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public function connect(array $config)
$this->pheanstalk($config),
$config['queue'],
$config['retry_after'] ?? Pheanstalk::DEFAULT_TTR,
$config['block_for'] ?? 0
$config['block_for'] ?? 0,
$config['after_commits'] ?? false
);
}

Expand Down
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/DatabaseConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public function connect(array $config)
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60
$config['retry_after'] ?? 60,
$config['after_commits'] ?? false
);
}
}
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/RedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public function connect(array $config)
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60,
$config['block_for'] ?? null
$config['block_for'] ?? null,
$config['after_commits'] ?? false
);
}
}
6 changes: 5 additions & 1 deletion src/Illuminate/Queue/Connectors/SqsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ public function connect(array $config)
}

return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? '', $config['suffix'] ?? ''
new SqsClient($config),
$config['queue'],
$config['prefix'] ?? '',
$config['suffix'] ?? '',
$config['after_commits'] ?? false
);
}

Expand Down
Loading