Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UHF-6542: Added generic hooks to skip/reset migrations #118

Merged
merged 5 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions documentation/migrate.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Migration

## Migration improvements

`\Drupal\helfi_api_base\Commands\MigrateHookCommands` adds two optional arguments to `migrate:import` command:

- `reset-threshold`: Resets migration status back to `Idle` if migration has been running longer than `reset-threshold`. For example, `--reset-threshold 43200` will reset migration back to `Idle` if the migration has been running for longer than 12 hours
- `interval`: Limit how often migration can be run. For example, running the migration import with `--interval 3600` will only run it once an hour, regardless how often it's called

## Migrate garbage collection

@todo
Expand Down
8 changes: 8 additions & 0 deletions drush.services.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
services:
helfi_api_base.migrate_hook_commands:
class: \Drupal\helfi_api_base\Commands\MigrateHookCommands
arguments:
- '@plugin.manager.migration'
- '@keyvalue'
- '@datetime.time'
tags:
- { name: drush.command }
helfi_api_base.pubsub_commands:
class: \Drupal\helfi_api_base\Commands\PubSubCommands
arguments:
Expand Down
228 changes: 228 additions & 0 deletions src/Commands/MigrateHookCommands.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
<?php

declare(strict_types = 1);

namespace Drupal\helfi_api_base\Commands;

use Consolidation\AnnotatedCommand\CommandData;
use Consolidation\AnnotatedCommand\Hooks\HookManager;
use Drupal\Component\Datetime\TimeInterface;
use Drupal\Core\KeyValueStore\KeyValueFactoryInterface;
use Drupal\migrate\Plugin\MigrationInterface;
use Drupal\migrate\Plugin\MigrationPluginManagerInterface;
use Drush\Attributes\Hook;
use Drush\Commands\DrushCommands;
use Drush\Utils\StringUtils;
use Robo\ResultData;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputOption;

/**
* Migrate hook commands.
*/
final class MigrateHookCommands extends DrushCommands {

/**
* Constructs a new instance.
*
* @param \Drupal\migrate\Plugin\MigrationPluginManagerInterface $migrationPluginManager
* The migration plugin manager.
* @param \Drupal\Core\KeyValueStore\KeyValueFactoryInterface $keyValueFactory
* The key value service.
* @param \Drupal\Component\Datetime\TimeInterface $time
* The time service.
*/
public function __construct(
private readonly MigrationPluginManagerInterface $migrationPluginManager,
private readonly KeyValueFactoryInterface $keyValueFactory,
private readonly TimeInterface $time,
) {
}

/**
* Gets the last imported timestamp of a migration.
*
* @param \Drupal\migrate\Plugin\MigrationInterface $migration
* The migration.
*
* @return int
* The last imported time.
*/
private function getLastImported(MigrationInterface $migration) : int {
$lastImported = $this->keyValueFactory->get('migrate_last_imported')?->get($migration->id(), 0);
return (int) round($lastImported / 1000);
}

/**
* Checks if the migration interval has been exceeded or not.
*
* @param \Drupal\migrate\Plugin\MigrationInterface $migration
* The migration.
* @param int $seconds
* The interval time.
*
* @return bool
* TRUE if the interval has been exceeded.
*/
private function migrationIntervalExceeded(MigrationInterface $migration, int $seconds) : bool {
$intervalTime = $this->getLastImported($migration) + $seconds;
$currentTime = $this->time->getCurrentTime();

if ($currentTime > $intervalTime) {
return TRUE;
}
return FALSE;
}

/**
* Adds 'interval' and 'reset-threshold' options to migrate:import command.
*
* @param \Symfony\Component\Console\Command\Command $command
* The command.
*/
#[Hook(type: HookManager::OPTION_HOOK, target: 'migrate:import')]
public function addMigrateHookOptions(Command $command) : void {
$command->addOption(
'interval',
mode: InputOption::VALUE_OPTIONAL,
description: 'An integer value to determine how often the migration can be run',
);
$command->addOption(
'reset-threshold',
mode: InputOption::VALUE_OPTIONAL,
description: 'An integer value to determine when a stuck migration should be reset',
);
}

/**
* Constructs the migration plugins for given IDs.
*
* @param string $ids
* The migration ids.
*
* @return \Drupal\migrate\Plugin\MigrationInterface[]
* The migrations.
*/
private function getMigrations(string $ids) : array {
$migrationIds = StringUtils::csvToArray($ids);
$migrations = $this->migrationPluginManager->createInstances($migrationIds);

return $migrations ?? [];
}

/**
* Constructs a message for given migrations.
*
* @param array $migrationIds
* The migration ids.
* @param string $message
* The message.
*
* @return \Robo\ResultData
* The result.
*/
private function createResult(array $migrationIds, string $message) : ResultData {
$messages = [];

foreach ($migrationIds as $id) {
$messages[] = sprintf('<comment>[%s] %s: %s</comment>', self::class, $id, $message);
}
return new ResultData(message: implode(PHP_EOL, $messages));
}

/**
* Checks if the migration status should be reset.
*
* Reset migration status to 'idle' if the migration has been running for
* longer than the value defined in 'reset-threshold' option.
*
* For example, call 'drush migrate:import tpr_service --reset-threshold 3600'
* to reset migration if it has been running for longer than 1 hour.
*
* @param \Consolidation\AnnotatedCommand\CommandData $commandData
* The command data.
*
* @return \Robo\ResultData|null
* The result or null.
*/
#[Hook(type: HookManager::PRE_ARGUMENT_VALIDATOR, target: 'migrate:import')]
public function resetMigrationsHook(CommandData $commandData) : ?ResultData {
$threshold = (int) $commandData->input()->getOption('reset-threshold') ?: NULL;

if (!$threshold) {
return NULL;
}
$migrations = $this->getMigrations((string) $commandData->input()->getArgument('migrationIds'));

if (!$migrations) {
return NULL;
}

$resetMigrations = [];

foreach ($migrations as $migration) {
if ($migration->getStatus() === MigrationInterface::STATUS_IDLE) {
continue;
}

if ($this->migrationIntervalExceeded($migration, $threshold)) {
$resetMigrations[] = $migration->id();

// Reset migration status if it has been running for longer than the
// configured maximum.
$migration->setStatus(MigrationInterface::STATUS_IDLE);
}
}

if (!$resetMigrations) {
return NULL;
}

$result = $this->createResult($resetMigrations, 'Migration status was reset back to idle.');
$commandData->output()
->writeln($result->getMessage());

return $result;
}

/**
* Checks if migrations should be skipped.
*
* Skip the migration if it has been less than N seconds since the last
* run. This can be configured by passing 'interval' option. For example,
* call 'drush migrate:import tpr_service --interval 3600' to allow migration
* to be run once an hour.
*
* @return \Robo\ResultData|null
* The result or null.
*/
#[Hook(type: HookManager::PRE_ARGUMENT_VALIDATOR, target: 'migrate:import')]
public function skipMigrationsHook(CommandData $commandData) : ?ResultData {
$interval = (int) $commandData->input()->getOption('interval') ?: NULL;

if (!$interval) {
return NULL;
}
$migrations = $this->getMigrations((string) $commandData->input()->getArgument('migrationIds'));

if (!$migrations) {
return NULL;
}
$skippedMigrations = [];

foreach ($migrations as $migration) {
if (!$this->migrationIntervalExceeded($migration, $interval)) {
$skippedMigrations[] = $migration->id();
}
}

if (!$skippedMigrations) {
return NULL;
}
$result = $this->createResult($skippedMigrations, sprintf('Migration has been run in the past %d seconds. Skipping ...', $interval));
$commandData->output()->writeln($result->getMessage());

return $result;
}

}
6 changes: 3 additions & 3 deletions src/EventSubscriber/MigrationSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private function getEntityType(MigrationInterface $migration) : ? EntityTypeInte

return $this->implementsRemoteEntityBase($entityType) ? $entityType : NULL;
}
catch (\Exception $e) {
catch (\Exception) {
}
}
}
Expand Down Expand Up @@ -141,7 +141,7 @@ public function onPreImport(MigrateImportEvent $event) : void {
return;
}

// Increment sync counter only when we're not doing a partial migrate.
// Increment sync counter only when we're not doing a partial migration.
// Partial migrates don't save any unchanged entities, leading post-migrate
// event to delete all unchanged entities.
if ($this->isPartialMigrate()) {
Expand All @@ -168,7 +168,7 @@ public function onPreImport(MigrateImportEvent $event) : void {
/**
* {@inheritdoc}
*/
public static function getSubscribedEvents() {
public static function getSubscribedEvents() : array {
return [
'migrate.pre_import' => ['onPreImport'],
'migrate.post_import' => ['onPostImport'],
Expand Down
4 changes: 2 additions & 2 deletions src/Logger/JsonLog.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Drupal\helfi_api_base\Logger;

use Drupal\Core\Logger\LogMessageParserInterface;
use Drupal\Core\Logger\RfcLoggerTrait;
use Drupal\Core\Logger\RfcLogLevel;
use Drupal\Core\StringTranslation\TranslatableMarkup;
use Psr\Log\LoggerInterface;
use Drupal\Core\Logger\RfcLogLevel;
use Drupal\Core\Logger\LogMessageParserInterface;

/**
* This class allows logging to stdout and stderr.
Expand Down
2 changes: 1 addition & 1 deletion tests/src/Kernel/DefaultLanguagesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Drupal\Tests\helfi_api_base\Kernel;

use Drupal\KernelTests\KernelTestBase;
use Drupal\Tests\helfi_api_base\Traits\LanguageManagerTrait;
use Drupal\language\Entity\ConfigurableLanguage;
use Drupal\Tests\helfi_api_base\Traits\LanguageManagerTrait;

/**
* Tests language resolver functionality.
Expand Down
Loading