Skip to content

Commit

Permalink
[10.x] Add progress option to PendingBatch (#49273)
Browse files Browse the repository at this point in the history
* Add `progress` option to pending batch

* Invoke `progress` callback also on failure
  • Loading branch information
orkhanahmadov authored Dec 12, 2023
1 parent 5654bdf commit 00c9c7f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
26 changes: 26 additions & 0 deletions src/Illuminate/Bus/Batch.php
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ public function recordSuccessfulJob(string $jobId)
{
$counts = $this->decrementPendingJobs($jobId);

if ($this->hasProgressCallbacks()) {
$batch = $this->fresh();

collect($this->options['progress'])->each(function ($handler) use ($batch) {
$this->invokeHandlerCallback($handler, $batch);
});
}

if ($counts->pendingJobs === 0) {
$this->repository->markAsFinished($this->id);
}
Expand Down Expand Up @@ -283,6 +291,16 @@ public function finished()
return ! is_null($this->finishedAt);
}

/**
* Determine if the batch has "progress" callbacks.
*
* @return bool
*/
public function hasProgressCallbacks()
{
return isset($this->options['progress']) && ! empty($this->options['progress']);
}

/**
* Determine if the batch has "success" callbacks.
*
Expand Down Expand Up @@ -328,6 +346,14 @@ public function recordFailedJob(string $jobId, $e)
$this->cancel();
}

if ($this->hasProgressCallbacks() && $this->allowsFailures()) {
$batch = $this->fresh();

collect($this->options['progress'])->each(function ($handler) use ($batch, $e) {
$this->invokeHandlerCallback($handler, $batch, $e);
});
}

if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
$batch = $this->fresh();

Expand Down
25 changes: 25 additions & 0 deletions src/Illuminate/Bus/PendingBatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,31 @@ public function add($jobs)
return $this;
}

/**
* Add a callback to be executed after a job in the batch have executed successfully.
*
* @param callable $callback
* @return $this
*/
public function progress($callback)
{
$this->options['progress'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;

return $this;
}

/**
* Get the "progress" callbacks that have been registered with the pending batch.
*
* @return array
*/
public function progressCallbacks()
{
return $this->options['progress'] ?? [];
}

/**
* Add a callback to be executed after all jobs in the batch have executed successfully.
*
Expand Down
16 changes: 15 additions & 1 deletion tests/Bus/BusBatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected function setUp(): void
$this->createSchema();

$_SERVER['__finally.count'] = 0;
$_SERVER['__progress.count'] = 0;
$_SERVER['__then.count'] = 0;
$_SERVER['__catch.count'] = 0;
}
Expand Down Expand Up @@ -72,7 +73,7 @@ public function createSchema()
*/
protected function tearDown(): void
{
unset($_SERVER['__finally.batch'], $_SERVER['__then.batch'], $_SERVER['__catch.batch'], $_SERVER['__catch.exception']);
unset($_SERVER['__finally.batch'], $_SERVER['__progress.batch'], $_SERVER['__then.batch'], $_SERVER['__catch.batch'], $_SERVER['__catch.exception']);

$this->schema()->drop('job_batches');

Expand Down Expand Up @@ -201,12 +202,14 @@ public function test_successful_jobs_can_be_recorded()
$batch->recordSuccessfulJob('test-id');

$this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']);
$this->assertInstanceOf(Batch::class, $_SERVER['__progress.batch']);
$this->assertInstanceOf(Batch::class, $_SERVER['__then.batch']);

$batch = $batch->fresh();
$this->assertEquals(0, $batch->pendingJobs);
$this->assertTrue($batch->finished());
$this->assertEquals(1, $_SERVER['__finally.count']);
$this->assertEquals(2, $_SERVER['__progress.count']);
$this->assertEquals(1, $_SERVER['__then.count']);
}

Expand Down Expand Up @@ -247,6 +250,7 @@ public function test_failed_jobs_can_be_recorded_while_not_allowing_failures()
$this->assertTrue($batch->finished());
$this->assertTrue($batch->cancelled());
$this->assertEquals(1, $_SERVER['__finally.count']);
$this->assertEquals(0, $_SERVER['__progress.count']);
$this->assertEquals(1, $_SERVER['__catch.count']);
$this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
}
Expand Down Expand Up @@ -288,6 +292,7 @@ public function test_failed_jobs_can_be_recorded_while_allowing_failures()
$this->assertFalse($batch->finished());
$this->assertFalse($batch->cancelled());
$this->assertEquals(1, $_SERVER['__catch.count']);
$this->assertEquals(2, $_SERVER['__progress.count']);
$this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
}

Expand Down Expand Up @@ -327,6 +332,11 @@ public function test_batch_state_can_be_inspected()
$batch->finishedAt = now();
$this->assertTrue($batch->finished());

$batch->options['progress'] = [];
$this->assertFalse($batch->hasProgressCallbacks());
$batch->options['progress'] = [1];
$this->assertTrue($batch->hasProgressCallbacks());

$batch->options['then'] = [];
$this->assertFalse($batch->hasThenCallbacks());
$batch->options['then'] = [1];
Expand Down Expand Up @@ -463,6 +473,10 @@ protected function createTestBatch($queue, $allowFailures = false)
$repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches');

$pendingBatch = (new PendingBatch(new Container, collect()))
->progress(function (Batch $batch) {
$_SERVER['__progress.batch'] = $batch;
$_SERVER['__progress.count']++;
})
->then(function (Batch $batch) {
$_SERVER['__then.batch'] = $batch;
$_SERVER['__then.count']++;
Expand Down
5 changes: 4 additions & 1 deletion tests/Bus/BusPendingBatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ public function test_pending_batch_may_be_configured_and_dispatched()

$pendingBatch = new PendingBatch($container, new Collection([$job]));

$pendingBatch = $pendingBatch->then(function () {
$pendingBatch = $pendingBatch->progress(function () {
//
})->then(function () {
//
})->catch(function () {
//
})->allowFailures()->onConnection('test-connection')->onQueue('test-queue')->withOption('extra-option', 123);

$this->assertSame('test-connection', $pendingBatch->connection());
$this->assertSame('test-queue', $pendingBatch->queue());
$this->assertCount(1, $pendingBatch->progressCallbacks());
$this->assertCount(1, $pendingBatch->thenCallbacks());
$this->assertCount(1, $pendingBatch->catchCallbacks());
$this->assertArrayHasKey('extra-option', $pendingBatch->options);
Expand Down

0 comments on commit 00c9c7f

Please sign in to comment.