Skip to content

Commit

Permalink
Add symfony/lock support
Browse files Browse the repository at this point in the history
  • Loading branch information
digilist committed Oct 16, 2018
1 parent 1b91043 commit a88c9b4
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 14 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"symfony/console": "^2.7 || ^3.3 || ^4.0",
"symfony/dependency-injection": "^2.7 || ^3.3 || ^4.0",
"symfony/filesystem": "^2.7 || ^3.3 || ^4.0",
"symfony/lock": "^3.4 || ^4.1",
"symfony/process": "^2.7.11 || ^3.3 || ^4.0",
"symfony/yaml": "^2.7 || ^3.3 || ^4.0"
},
Expand Down
101 changes: 95 additions & 6 deletions src/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
use Crunz\Pinger\PingableInterface;
use Crunz\Pinger\PingableTrait;
use SuperClosure\Serializer;
use Symfony\Component\Lock\Factory;
use Symfony\Component\Lock\Lock;
use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Process\Process;

/**
Expand Down Expand Up @@ -64,6 +67,7 @@ class Event implements PingableInterface
* @var Logger
*/
public $logger;

/**
* The event's unique identifier.
*
Expand Down Expand Up @@ -154,6 +158,21 @@ class Event implements PingableInterface
'week' => 5,
];

/**
* The symfony lock factory that is used to acquire locks. If the value is null, but preventOverlapping = true
* crunz falls back to filesystem locks.
*
* @var Factory|null
*/
private $lockFactory;

/**
* Contains the timestamp of the last lock refresh.
*
* @var int
*/
private $lastLockRefresh = 0;

/**
* Create a new event instance.
*
Expand Down Expand Up @@ -712,13 +731,19 @@ public function user($user)
/**
* Do not allow the event to overlap each other.
*
* @param string|int $safe_duration
* By default, the lock is acquired through file system locks. Alternatively, you can pass a symfony lock store
* that will be responsible for the locking.
*
* @param StoreInterface $store
*
* @return $this
*/
public function preventOverlapping()
public function preventOverlapping(StoreInterface $store = null)
{
$this->preventOverlapping = true;
if (null !== $store) {
$this->lockFactory = new Factory($store);
}

// Skip the event if it's locked (processing)
$this->skip(function () {
Expand All @@ -727,10 +752,7 @@ public function preventOverlapping()

// Delete the lock file when the event is completed
$this->after(function () {
$lockfile = $this->lockFile();
if (file_exists($lockfile)) {
unlink($lockfile);
}
$this->releaseLock();
});

return $this;
Expand Down Expand Up @@ -1018,6 +1040,13 @@ public function afterCallbacks()
*/
public function isLocked()
{
if (null !== $this->lockFactory) {
$lock = $this->createLockObject();

return $lock->isAcquired();
}

// If no symfony lock object is given, fall back to file system locks.
$pid = $this->lastPid();
$hasPid = (null !== $pid);

Expand Down Expand Up @@ -1048,9 +1077,69 @@ public function lastPid()
*/
public function lockFile()
{
if (null !== $this->lockFactory) {
throw new \BadMethodCallException('We are not using file based locking, but instead symfony/lock');
}

return rtrim(sys_get_temp_dir(), '/') . '/crunz-' . md5($this->buildCommand());
}

/**
* If this event is prevented from overlapping, this method should be called regularly to refresh the lock.
*/
public function refreshLock()
{
if (!$this->preventOverlapping) {
return;
}

if (null === $this->lockFactory) {
// In case of file based locking there is nothing to do.
}

// Refresh lock every 10s
$lockRefreshNeeded = $this->lastLockRefresh < (time() - 10);
if ($lockRefreshNeeded) {
$this->createLockObject()->refresh();
}
}

/**
* Get the symfony lock object for the task.
*
* @return Lock
*/
protected function createLockObject()
{
if (null === $this->lockFactory) {
throw new \BadMethodCallException(
'No lock factory. Please call preventOverlapping() with lock storage first.'
);
}

$ttl = 30;

return $this->lockFactory->createLock('crunz-' . md5($this->buildCommand()), $ttl);
}

/**
* Release the lock after the command completed.
*/
protected function releaseLock()
{
if (null !== $this->lockFactory) {
$lock = $this->createLockObject();
$lock->release();

return;
}

$lockfile = $this->lockFile();
if (file_exists($lockfile)) {
unlink($lockfile);
}
}

/**
* Get the default output depending on the OS.
*
Expand Down
2 changes: 2 additions & 0 deletions src/EventRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ protected function manageStartedEvents()

/** @var Event $event */
foreach ($events as $eventKey => $event) {
$event->refreshLock();

$proc = $event->getProcess();
if ($proc->isRunning()) {
continue;
Expand Down
35 changes: 27 additions & 8 deletions tests/Functional/PreventOverlappingTest.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
<?php

declare(strict_types=1);

namespace Crunz\Tests\Functional;

use Crunz\Configuration\Configuration;
Expand All @@ -15,10 +13,18 @@
use Crunz\Schedule;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Output\NullOutput;
use Symfony\Component\Lock\Store\FlockStore;
use Symfony\Component\Lock\StoreInterface;

final class PreventOverlappingTest extends TestCase
{
public function testPreventOverlapping()
/**
* @dataProvider lockDataProvider
* @test
*
* @param StoreInterface|null $store
*/
public function testPreventOverlapping(StoreInterface $store = null)
{
$command = PHP_BINARY . ' -r "usleep(250000);"';

Expand Down Expand Up @@ -48,7 +54,7 @@ public function testPreventOverlapping()
$this->assertEquals([$event1], $schedule1->dueEvents(new \DateTimeZone(date_default_timezone_get())));
$this->assertEquals([$event2], $schedule2->dueEvents(new \DateTimeZone(date_default_timezone_get())));

// Start schedule, so that event1 will be locked
// Start schedule1, so that event1 will be locked
$eventRunner->handle(new NullOutput(), [$schedule1]);

// Event is locked and therefore not due (even over the boundaries of multiple independent events and schedules)
Expand All @@ -65,18 +71,32 @@ public function testPreventOverlapping()

// Wait until the process finished
while ($event1->isLocked()) {
// Verify the events are still locked
$this->assertEquals([], $schedule1->dueEvents(new \DateTimeZone(date_default_timezone_get())));
$this->assertEquals([], $schedule2->dueEvents(new \DateTimeZone(date_default_timezone_get())));
$this->assertTrue($event1->isLocked());
$this->assertTrue($event2->isLocked());

$eventRunner->manageStartedEvents();
usleep(10000);
usleep(50000);
}

// Assert both locks were removed
$this->assertFalse($event1->isLocked());
$this->assertFalse($event2->isLocked());
}
}

class MyEventRunner extends EventRunner {
public function lockDataProvider()
{
return [
[null], // Default file locking
[new FlockStore()],
];
}
}

class MyEventRunner extends EventRunner
{
/**
* Manage the running processes.
*
Expand All @@ -101,7 +121,6 @@ public function manageStartedEvents()
$schedule->dismissEvent($eventKey);
}


if (!\count($schedule->events())) {
$this->invoke($schedule->afterCallbacks());
unset($this->schedules[$scheduleKey]);
Expand Down

0 comments on commit a88c9b4

Please sign in to comment.