Skip to content

Commit

Permalink
Refactor trigger (#711)
Browse files Browse the repository at this point in the history
* Refactor trigger

* Refactor HealthMonitor.php to make Timer property readonly

* Refactor HealthMonitor.php to make Timer property readonly

* Refactor Consumer.php to use null-safe operator for health monitor process

* Refactor Consumer.php constructor to use null-safe operator for health monitor process

---------

Co-authored-by: Deeka Wong <[email protected]>
  • Loading branch information
huangdijia and huangdijia authored Sep 12, 2024
1 parent 6a8554d commit d166445
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 97 deletions.
16 changes: 16 additions & 0 deletions src/Config.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact [email protected]
*/

namespace FriendsOfHyperf\Trigger;

class Config extends \Hyperf\Config\Config
{
}
107 changes: 58 additions & 49 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use FriendsOfHyperf\Trigger\Snapshot\BinLogCurrentSnapshotInterface;
use FriendsOfHyperf\Trigger\Subscriber\SnapshotSubscriber;
use FriendsOfHyperf\Trigger\Subscriber\TriggerSubscriber;
use Hyperf\Collection\Arr;
use Hyperf\Config\Config;
use Hyperf\Coordinator\Constants;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Coroutine\Coroutine;
Expand All @@ -31,44 +31,39 @@

class Consumer
{
protected ?string $name = null;
public readonly Config $config;

private ?HealthMonitor $healthMonitor = null;
public readonly string $name;

private ?ServerMutexInterface $serverMutex = null;
public readonly string $identifier;

private BinLogCurrentSnapshotInterface $binLogCurrentSnapshot;
public readonly ?HealthMonitor $healthMonitor;

public readonly ?ServerMutexInterface $serverMutex;

public readonly BinLogCurrentSnapshotInterface $binLogCurrentSnapshot;

private bool $stopped = false;

public function __construct(
protected SubscriberManager $subscriberManager,
protected TriggerManager $triggerManager,
protected string $connection = 'default',
protected array $options = [],
protected ?LoggerInterface $logger = null
protected readonly SubscriberManager $subscriberManager,
protected readonly TriggerManager $triggerManager,
public readonly string $connection = 'default',
array $options = [],
public readonly ?LoggerInterface $logger = null
) {
$this->name = $options['name'] ?? 'trigger.' . $connection;

$this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, [
'consumer' => $this,
]);

if ($this->getOption('server_mutex.enable', true)) {
$this->serverMutex = make(ServerMutexInterface::class, [
'name' => 'trigger:mutex:' . $this->connection,
'owner' => Util::getInternalIp(),
'options' => $this->getOption('server_mutex', []) + ['connection' => $this->connection],
'logger' => $this->logger,
]);
}

if ($this->getOption('health_monitor.enable', true)) {
$this->healthMonitor = make(HealthMonitor::class, [
'consumer' => $this,
'logger' => $this->logger,
]);
}
$this->name = $options['name'] ?? sprintf('trigger.%s', $this->connection);
$this->identifier = $options['identifier'] ?? sprintf('trigger.%s', $this->connection);
$this->config = new Config($options);

$this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, ['consumer' => $this]);
$this->healthMonitor = $this->config->get('health_monitor.enable', true) ? make(HealthMonitor::class, ['consumer' => $this]) : null;
$this->serverMutex = $this->config->get('server_mutex.enable', true) ? make(ServerMutexInterface::class, [
'name' => 'trigger:mutex:' . $this->connection,
'owner' => Util::getInternalIp(),
'options' => $this->config->get('server_mutex', []) + ['connection' => $this->connection],
'logger' => $this->logger,
]) : null;
}

public function start(): void
Expand All @@ -79,14 +74,12 @@ public function start(): void
$this->stopped = false;

// Health monitor start
if ($this->healthMonitor) {
$this->healthMonitor->process();
}
$this->healthMonitor?->process();

$replication = $this->makeReplication();

// Replication start
CoordinatorManager::until($this->getIdentifier())->resume();
CoordinatorManager::until($this->identifier)->resume();

$this->logger?->debug('[{connection}] Consumer started.', $context);

Expand Down Expand Up @@ -120,38 +113,56 @@ public function start(): void
}
}

/**
* @deprecated use `$this->binLogCurrentSnapshot` instead, will remove in v3.2.
*/
public function getBinLogCurrentSnapshot(): BinLogCurrentSnapshotInterface
{
return $this->binLogCurrentSnapshot;
}

/**
* @deprecated use `$this->healthMonitor` instead, will remove in v3.2.
*/
public function getHealthMonitor(): ?HealthMonitor
{
return $this->healthMonitor;
}

/**
* @deprecated use `$this->name` instead, will remove in v3.2.
*/
public function getName(): string
{
return $this->name;
}

public function getOption(?string $key = null, $default = null)
/**
* @deprecated use `$this->config->get($key, $default)` instead, will remove in v3.2.
*/
public function getOption(?string $key = null, mixed $default = null): mixed
{
if (is_null($key)) {
return $this->options;
return (fn () => $this->configs ?? [])->call($this->config);
}

return Arr::get($this->options, $key, $default);
return $this->config->get($key, $default);
}

/**
* @deprecated use `$this->connection` instead, will remove in v3.2.
*/
public function getConnection(): string
{
return $this->connection;
}

/**
* @deprecated use `$this->identifier` instead, will remove in v3.2.
*/
public function getIdentifier(): string
{
return sprintf('%s_start', $this->connection);
return $this->identifier;
}

public function stop(): void
Expand All @@ -168,34 +179,32 @@ public function isStopped(): bool
protected function makeReplication(): MySQLReplicationFactory
{
$connection = $this->connection;
// Get options
$config = (array) $this->options;
// Get databases of replication
$databasesOnly = array_replace(
$config['databases_only'] ?? [],
$this->config->get('databases_only', []),
$this->triggerManager->getDatabases($connection)
);
// Get tables of replication
$tablesOnly = array_replace(
$config['tables_only'] ?? [],
$this->config->get('tables_only', []),
$this->triggerManager->getTables($connection)
);

$configBuilder = (new ConfigBuilder())
->withUser($config['user'] ?? 'root')
->withHost($config['host'] ?? '127.0.0.1')
->withPassword($config['password'] ?? 'root')
->withPort((int) ($config['port'] ?? 3306))
->withUser($this->config->get('user', 'root'))
->withHost($this->config->get('host', '127.0.0.1'))
->withPassword($this->config->get('password', 'root'))
->withPort((int) $this->config->get('port', 3306))
->withSlaveId(random_int(10000, 9999999))
->withHeartbeatPeriod((float) ($config['heartbeat_period'] ?? 3))
->withHeartbeatPeriod((float) $this->config->get('heartbeat_period', 3))
->withDatabasesOnly($databasesOnly)
->withTablesOnly($tablesOnly);

if (method_exists($configBuilder, 'withSlaveUuid')) { // php-mysql-replication >= 8.0
$configBuilder->withSlaveUuid(Str::uuid()->toString());
}

if ($binLogCurrent = $this->getBinLogCurrentSnapshot()->get()) {
if ($binLogCurrent = $this->binLogCurrentSnapshot->get()) {
$configBuilder->withBinLogFileName($binLogCurrent->getBinFileName())
->withBinLogPosition($binLogCurrent->getBinLogPosition());

Expand Down
45 changes: 15 additions & 30 deletions src/Monitor/HealthMonitor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,42 @@

use FriendsOfHyperf\Trigger\Consumer;
use FriendsOfHyperf\Trigger\Event\OnReplicationStop;
use FriendsOfHyperf\Trigger\Snapshot\BinLogCurrentSnapshotInterface;
use Hyperf\Coordinator\CoordinatorManager;
use Hyperf\Coordinator\Timer;
use Hyperf\Coroutine\Coroutine;
use MySQLReplication\BinLog\BinLogCurrent;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;

class HealthMonitor
{
protected ?BinLogCurrent $binLogCurrent = null;

protected int $monitorInterval = 10;

protected int $snapShortInterval = 10;

protected string $connection;

protected BinLogCurrentSnapshotInterface $binLogCurrentSnapshot;

protected Timer $timer;

public function __construct(
protected ContainerInterface $container,
protected Consumer $consumer,
protected ?LoggerInterface $logger = null
) {
$this->connection = $consumer->getConnection();
$this->monitorInterval = (int) $consumer->getOption('health_monitor.interval', 10);
$this->snapShortInterval = (int) $consumer->getOption('snapshot.interval', 10);
$this->binLogCurrentSnapshot = $consumer->getBinLogCurrentSnapshot();
$this->timer = new Timer($logger);
public function __construct(protected ContainerInterface $container, protected Consumer $consumer)
{
$this->timer = new Timer($consumer->logger);
}

public function process(): void
{
Coroutine::create(function () {
CoordinatorManager::until($this->consumer->getIdentifier())->yield();
CoordinatorManager::until($this->consumer->identifier)->yield();

$context = ['connection' => $this->connection];
$monitorInterval = $this->consumer->config->get('health_monitor.interval', 10);
$snapShortInterval = (int) $this->consumer->config->get('snapshot.interval', 10);
$context = ['connection' => $this->consumer->connection];

// Monitor binLogCurrent
$this->timer->tick($this->monitorInterval, function () use ($context) {
$this->timer->tick($monitorInterval, function () use ($context) {
if ($this->consumer->isStopped()) {
$this->logger?->warning('[{connection}] Health monitor stopped.', $context);
$this->consumer->logger?->warning('[{connection}] Health monitor stopped.', $context);
return Timer::STOP;
}

if ($this->binLogCurrent instanceof BinLogCurrent) {
$this->logger?->debug(
$this->consumer->logger?->debug(
'[{connection}] Health monitoring, binLogCurrent: [{binlog_current}]',
$context +
[
Expand All @@ -74,28 +59,28 @@ public function process(): void
});

// Health check and set snapshot
$this->timer->tick($this->snapShortInterval, function () use ($context) {
$this->timer->tick($snapShortInterval, function () use ($context) {
if ($this->consumer->isStopped()) {
$this->logger?->warning('[{connection}] Snapshot stopped.', $context);
$this->consumer->logger?->warning('[{connection}] Snapshot stopped.', $context);
return Timer::STOP;
}

if (! $this->binLogCurrent instanceof BinLogCurrent) {
return;
}

$binLogCurrentCache = $this->binLogCurrentSnapshot->get();
$binLogCurrentCache = $this->consumer->binLogCurrentSnapshot->get();

if (
$binLogCurrentCache instanceof BinLogCurrent
&& $binLogCurrentCache->getBinLogPosition() == $this->binLogCurrent->getBinLogPosition()
) {
if ($this->container->has(EventDispatcherInterface::class)) {
$this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->connection, $this->binLogCurrent));
$this->container->get(EventDispatcherInterface::class)?->dispatch(new OnReplicationStop($this->consumer->connection, $this->binLogCurrent));
}
}

$this->binLogCurrentSnapshot->set($this->binLogCurrent);
$this->consumer->binLogCurrentSnapshot->set($this->binLogCurrent);
});
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/Snapshot/RedisBinLogCurrentSnapshot.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function __construct(
public function set(BinLogCurrent $binLogCurrent): void
{
$this->redis->set($this->key(), serialize($binLogCurrent));
$this->redis->expire($this->key(), (int) $this->consumer->getOption('snapshot.expires', 24 * 3600));
$this->redis->expire($this->key(), (int) $this->consumer->config->get('snapshot.expires', 24 * 3600));
}

public function get(): ?BinLogCurrent
Expand All @@ -50,8 +50,8 @@ private function key(): string
'trigger',
'snapshot',
'binLogCurrent',
$this->consumer->getOption('snapshot.version', '1.0'),
$this->consumer->getConnection(),
$this->consumer->config->get('snapshot.version', '1.0'),
$this->consumer->connection,
]);
}
}
4 changes: 2 additions & 2 deletions src/Subscriber/SnapshotSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public function __construct(protected Consumer $consumer)

protected function allEvents(EventDTO $event): void
{
if (! $this->consumer->getHealthMonitor()) {
if (! $this->consumer->healthMonitor) {
return;
}

Expand All @@ -37,6 +37,6 @@ protected function allEvents(EventDTO $event): void
return;
}

$this->consumer->getHealthMonitor()->setBinLogCurrent($binLogCurrent);
$this->consumer->healthMonitor->setBinLogCurrent($binLogCurrent);
}
}
Loading

0 comments on commit d166445

Please sign in to comment.