Skip to content

Commit

Permalink
Add ThrottlesExceptionsWithRedis job middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
paras-malhotra committed Mar 9, 2021
1 parent 92a1ce8 commit 35071ab
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/Illuminate/Queue/Middleware/ThrottlesExceptionsWithRedis.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace Illuminate\Queue\Middleware;

use Illuminate\Container\Container;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Redis\Limiters\DurationLimiter;
use Illuminate\Support\InteractsWithTime;
use Throwable;

class ThrottlesExceptionsWithRedis extends ThrottlesExceptions
{
use InteractsWithTime;

/**
* The Redis factory implementation.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;

/**
* The rate limiter instance.
*
* @var \Illuminate\Redis\Limiters\DurationLimiter
*/
protected $limiter;

/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
$this->redis = Container::getInstance()->make(Redis::class);

$this->limiter = new DurationLimiter(
$this->redis, $this->getKey($job), $this->maxAttempts, $this->decayMinutes * 60
);

if ($this->limiter->tooManyAttempts()) {
return $job->release($this->limiter->decaysAt - $this->currentTime());
}

try {
$next($job);

$this->limiter->clear();
} catch (Throwable $throwable) {
if ($this->whenCallback && ! call_user_func($this->whenCallback, $throwable)) {
throw $throwable;
}

$this->limiter->acquire();

return $job->release($this->retryAfterMinutes * 60);
}
}
}
54 changes: 54 additions & 0 deletions src/Illuminate/Redis/Limiters/DurationLimiter.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ public function acquire()
return (bool) $results[0];
}

/**
* Determine if the key has been "accessed" too many times.
*
* @return bool
*/
public function tooManyAttempts()
{
[$this->decaysAt, $this->remaining] = $this->redis->eval(
$this->tooManyAttemptsScript(), 1, $this->name, microtime(true), time(), $this->decay, $this->maxLocks
);

return $this->remaining <= 0;
}

/**
* Clear the limiter.
*
* @return void
*/
public function clear()
{
$this->redis->del($this->name);
}

/**
* Get the Lua script for acquiring a lock.
*
Expand Down Expand Up @@ -143,6 +167,36 @@ protected function luaScript()
end
return {reset(), ARGV[2] + ARGV[3], ARGV[4] - 1}
LUA;
}

/**
* Get the Lua script to determine if the key has been "accessed" too many times.
*
* KEYS[1] - The limiter name
* ARGV[1] - Current time in microseconds
* ARGV[2] - Current time in seconds
* ARGV[3] - Duration of the bucket
* ARGV[4] - Allowed number of tasks
*
* @return string
*/
protected function tooManyAttemptsScript()
{
return <<<'LUA'
if redis.call('EXISTS', KEYS[1]) == 0 then
return {0, ARGV[2] + ARGV[3]}
end
if ARGV[1] >= redis.call('HGET', KEYS[1], 'start') and ARGV[1] <= redis.call('HGET', KEYS[1], 'end') then
return {
redis.call('HGET', KEYS[1], 'end'),
ARGV[4] - redis.call('HGET', KEYS[1], 'count')
}
end
return {0, ARGV[2] + ARGV[3]}
LUA;
}
}
167 changes: 167 additions & 0 deletions tests/Integration/Queue/ThrottlesExceptionsWithRedisTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
<?php

namespace Illuminate\Tests\Integration\Queue;

use Exception;
use Illuminate\Bus\Dispatcher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
use Illuminate\Queue\CallQueuedHandler;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis;
use Illuminate\Support\Str;
use Mockery as m;
use Orchestra\Testbench\TestCase;

/**
* @group integration
*/
class ThrottlesExceptionsWithRedisTest extends TestCase
{
use InteractsWithRedis;

protected function setUp(): void
{
parent::setUp();

$this->setUpRedis();
}

protected function tearDown(): void
{
parent::tearDown();

$this->tearDownRedis();

m::close();
}

public function testCircuitIsOpenedForJobErrors()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random());
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key);
}

public function testCircuitStaysClosedForSuccessfulJobs()
{
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key = Str::random());
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
}

public function testCircuitResetsAfterSuccess()
{
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key = Str::random());
$this->assertJobRanSuccessfully(CircuitBreakerWithRedisSuccessfulJob::class, $key);
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedImmediately(CircuitBreakerWithRedisTestJob::class, $key);
$this->assertJobWasReleasedWithDelay(CircuitBreakerWithRedisTestJob::class, $key);
}

protected function assertJobWasReleasedImmediately($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->with(0)->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertTrue($class::$handled);
}

protected function assertJobWasReleasedWithDelay($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('release')->withArgs(function ($delay) {
return $delay >= 600;
})->once();
$job->shouldReceive('isReleased')->andReturn(true);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(true);

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertFalse($class::$handled);
}

protected function assertJobRanSuccessfully($class, $key)
{
$class::$handled = false;
$instance = new CallQueuedHandler(new Dispatcher($this->app), $this->app);

$job = m::mock(Job::class);

$job->shouldReceive('hasFailed')->once()->andReturn(false);
$job->shouldReceive('isReleased')->andReturn(false);
$job->shouldReceive('isDeletedOrReleased')->once()->andReturn(false);
$job->shouldReceive('delete')->once();

$instance->call($job, [
'command' => serialize($command = new $class($key)),
]);

$this->assertTrue($class::$handled);
}
}

class CircuitBreakerWithRedisTestJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function __construct($key)
{
$this->key = $key;
}

public function handle()
{
static::$handled = true;

throw new Exception;
}

public function middleware()
{
return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)];
}
}

class CircuitBreakerWithRedisSuccessfulJob
{
use InteractsWithQueue, Queueable;

public static $handled = false;

public function __construct($key)
{
$this->key = $key;
}

public function handle()
{
static::$handled = true;
}

public function middleware()
{
return [new ThrottlesExceptionsWithRedis(2, 10, 0, $this->key)];
}
}

0 comments on commit 35071ab

Please sign in to comment.