Skip to content

Commit

Permalink
Merge pull request #12 from Headoo/cme-clean-test
Browse files Browse the repository at this point in the history
Refresh a part of document with options --join --where --id :
php app/console headoo:elastic:populate --type=A --join=B,C --where=Field --id=42
  • Loading branch information
corentinheadoo authored Dec 15, 2017
2 parents cc5823e + 930d833 commit 0d64f71
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 51 deletions.
36 changes: 34 additions & 2 deletions Command/AbstractCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ abstract class AbstractCommand extends ContainerAwareCommand
protected $verbose = false;
/** @var bool $dryRun Do not make any change on ES */
protected $dryRun = false;
/** @var bool $quiet */
protected $quiet = false;
/** @var int $id */
protected $id = null;
/** @var string $where */
protected $where = null;
/** @var string $join */
protected $join = null;
/** @var string */
protected $environment;
/** @var string */
Expand All @@ -53,6 +61,7 @@ abstract class AbstractCommand extends ContainerAwareCommand
/**
* @param InputInterface $input
* @param OutputInterface $output
* @throws \Doctrine\ORM\OptimisticLockException
*/
protected function init(InputInterface $input, OutputInterface $output)
{
Expand Down Expand Up @@ -134,6 +143,22 @@ protected function readOption(InputInterface $input)
if ($input->hasOption('verbose')) {
$this->verbose = $input->getOption('verbose');
}

if ($input->hasOption('quiet')) {
$this->quiet = $input->getOption('quiet');
}

if ($input->hasOption('id')) {
$this->id = $input->getOption('id');
}

if ($input->hasOption('where')) {
$this->where = $input->getOption('where');
}

if ($input->hasOption('join')) {
$this->join = $input->getOption('join');
}
}

/**
Expand Down Expand Up @@ -208,8 +233,15 @@ protected function getRepositoryFromType($sType)
protected function getClient($sType)
{
$connection = $this->mappings[$sType]['connection'];
$client = $this->elasticSearchHelper->getClient($connection);

return $this->elasticSearchHelper->getClient($connection);
}
try {
$this->elasticSearchHelper->isConnected($client);
} catch (\Elastica\Exception\Connection\HttpException $e) {
$this->output->writeln("<error>ElasticSearch connection error. Check your configuration and your connection</error>");
throw $e;
}

return $client;
}
}
93 changes: 81 additions & 12 deletions Command/PopulateElasticCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Headoo\ElasticSearchBundle\Command;

use Doctrine\ORM\Query;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
Expand All @@ -19,19 +20,40 @@ protected function configure()
->addOption('offset', null, InputOption::VALUE_OPTIONAL, 'Offset For selected Type', 0)
->addOption('type', null, InputOption::VALUE_OPTIONAL, 'Type of document you want to populate. You must to have configure it before use', null)
->addOption('threads', null, InputOption::VALUE_OPTIONAL, 'number of simultaneous threads', null)
->addOption('reset', null)
->addOption('batch', null, InputOption::VALUE_OPTIONAL, 'Number of Document per batch', null);
->addOption('reset', null, InputOption::VALUE_NONE, 'Reset the index')
->addOption('batch', null, InputOption::VALUE_OPTIONAL, 'Number of Document per batch', null)
->addOption('id', null, InputOption::VALUE_REQUIRED, 'Refresh a specific object with his Id', null)
->addOption('where', null, InputOption::VALUE_REQUIRED, 'Refresh objects with specific field ', null)
->addOption('join', null, InputOption::VALUE_REQUIRED, 'Join on another entity', null);
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws \Doctrine\ORM\OptimisticLockException
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->init($input, $output);

if ($input->getOption('where') && !$input->getOption('id')) {
$output->writeln("<error>The option 'where' must be used with option 'id'</error>");
return self::EXIT_FAILED;
}

if ($input->getOption('id')) {
if ($input->getOption('batch') || $input->getOption('reset') || $input->getOption('threads')) {
$output->writeln("<error>The option 'id' cannot be used with options 'batch', 'reset', or 'threads'</error>");
return self::EXIT_FAILED;
}

if (!$input->getOption('type')) {
$output->writeln("<error>The option 'id' have to be used with option 'type'</error>");
return self::EXIT_FAILED;
}
}

// We add a limit per batch which equal of the batch option
if($input->getOption('batch')){
$this->limit = $this->batch ;
Expand Down Expand Up @@ -145,9 +167,10 @@ public function runParallel(ProgressBar $progressBar, array $processes, $maxPara

unset($currentProcesses[$index]);

$progression += $this->limit;
$processDone = intval($process->getOutput());
$progression += $processDone;
$progressBar->setMessage("$progression/$progressMax");
$progressBar->advance($this->limit);
$progressBar->advance($processDone);

// directly add and start new process after the previous finished
if (count($processesQueue) > 0) {
Expand All @@ -162,9 +185,9 @@ public function runParallel(ProgressBar $progressBar, array $processes, $maxPara
// continue loop while there are processes being executed or waiting for execution
} while (count($processesQueue) > 0 || count($currentProcesses) > 0);

$progressBar->setMessage("$numberOfEntities/$progressMax");
$progressBar->setProgress($numberOfEntities);
$progressBar->finish();
$progressBar->display();
$this->output->writeln('');

return $returnValue;
}
Expand All @@ -185,7 +208,7 @@ public function beginBatch($type)

for ($i = 0; $i <= $numberOfProcess; $i++) {
$_offset = $this->offset + ($this->limit * $i);
$process = new Process("php $this->consoleDir headoo:elastic:populate --type={$type} --limit={$this->limit} --offset={$_offset} " . $sOptions);
$process = new Process("php $this->consoleDir headoo:elastic:populate --type={$type} --limit={$this->limit} --offset={$_offset} --quiet " . $sOptions);
$aProcess[] = $process;
}

Expand All @@ -206,8 +229,8 @@ public function processBatch($type, $transformer)
$this->output->writeln(self::completeLine("Finish Type {$type} and Mapping"));
$this->output->writeln(self::completeLine("Start populate {$type}"));

$iResults = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u")->getResult()[0][1];
$query = $this->entityManager->createQuery("select u from {$this->mappings[$type]['class']} u");
// Select a specific object from his ID
$query = $this->_getQuery($type, $iResults);

if($this->offset){
$query->setFirstResult($this->offset);
Expand All @@ -222,7 +245,7 @@ public function processBatch($type, $transformer)
$iterableResult = $query->iterate();

$progressBar = $this->getProgressBar($this->output, $iResults);
$progression = $this->offset;
$progression = 0;
$progressMax = $iResults + $this->offset;

$aDocuments = [];
Expand All @@ -237,18 +260,24 @@ public function processBatch($type, $transformer)
$aDocuments[]= $document;
$this->entityManager->detach($row[0]);

$progressBar->setMessage(($progression++) . "/{$progressMax}");
$progressBar->setMessage((++$progression + $this->offset) . "/{$progressMax}");
$progressBar->advance();

gc_collect_cycles();
}

$this->_bulk($objectType, $aDocuments);
$this->output->writeln(self::completeLine("Start populate '{$type}'"));

$progressBar->setProgress($iResults);
$progressBar->display();
$progressBar->finish();

$this->output->writeln('');
$this->output->writeln("<info>" . self::completeLine("Finish populate {$type}") . "</info>");
# In quite mode: just write in output the number of documents treated
if ($this->quiet) {
$this->output->writeln("$progression", OutputInterface::VERBOSITY_QUIET);
}
}

/**
Expand All @@ -273,4 +302,44 @@ private function _resetType($type)
return true;
}

/**
* @param string $type
* @param int $iResults
* @return Query
*/
private function _getQuery($type, &$iResults)
{
$id = filter_var($this->id, FILTER_SANITIZE_STRING);
$where = filter_var($this->where, FILTER_SANITIZE_STRING);
$joins = filter_var($this->join, FILTER_SANITIZE_STRING);
$entity = 'u';

# Forge clause JOIN
$clauseJoin = '';
$aJoins = explode(',', $joins);
foreach ($aJoins as $join) {
if (empty($join)) {
break;
}
$newId = ($newId ?? 0) + 1;
$newEntity = "u_$newId";
$clauseJoin .= " LEFT JOIN {$entity}.{$join} {$newEntity} ";
$entity = $newEntity;
}

# Forge clause WHERE
$clauseWhere = '';
if ($id && $where) {
$clauseWhere = " WHERE {$entity}.{$where} = '{$id}'";
}
if ($id && !$where) {
$clauseWhere = " WHERE {$entity}.id = '{$id}'";
}

# COUNT results
$iResults = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u $clauseJoin $clauseWhere")->getResult()[0][1];

# Return Query
return $this->entityManager->createQuery("SELECT u FROM {$this->mappings[$type]['class']} u $clauseJoin $clauseWhere");
}
}
2 changes: 1 addition & 1 deletion EventListener/ElasticSearchListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public function sendEvent(LifecycleEventArgs $args, $action)
return;
}

if (!array_key_exists('auto_event', $this->mapping[$type])) {
if (!array_key_exists('auto_event', $this->mapping[$type]) || !$this->mapping[$type]['auto_event']) {
$event = new ElasticSearchEvent($action, $entity);
$this->eventDispatcher->dispatch("headoo.elasticsearch.event", $event);
return;
Expand Down
13 changes: 12 additions & 1 deletion Helper/ElasticSearchHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function getClient($connectionName)
* @param array $servers
* @return \Elastica\Client
*/
public function getCluster(array $servers)
static public function getCluster(array $servers)
{
$cluster = new Client([
'servers' => [$servers]
Expand All @@ -44,4 +44,15 @@ public function getCluster(array $servers)
return $cluster;
}

/**
* @param Client $elasticaClient
* @return bool
*/
static public function isConnected(\Elastica\Client $elasticaClient)
{
$status = $elasticaClient->getStatus();
$status->refresh();

return true;
}
}
1 change: 0 additions & 1 deletion Tests/Command/AbstractCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,4 @@ public function testCompleteLine()
"Completed line should be equals to LINE_LENGTH: '$sMsg'"
);
}

}
14 changes: 6 additions & 8 deletions Tests/Command/ExodusElasticCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,21 @@

class ExodusElasticCommandTest extends KernelTestCase
{

/** @var \Headoo\ElasticSearchBundle\Helper\ElasticSearchHelper */
private $elasticSearchHelper;

/** @var EntityManager */
private $entityManager;

/** @var Application */
protected $application;


/**
* {@inheritDoc}
* @outputBuffering disabled
*/
public function setUp()
{
parent::setUp();
self::bootKernel();

$this->entityManager = static::$kernel->getContainer()->get('doctrine')->getManager();
$this->elasticSearchHelper = static::$kernel->getContainer()->get('headoo.elasticsearch.helper');
$this->application = new Application(self::$kernel);
$this->application->setAutoExit(false);

Expand All @@ -45,7 +39,7 @@ public function testCommandFakeEntity()
$options1 = [
'command' => 'headoo:elastic:exodus',
'--batch' => 10,
'--dry-run' => true,
'--dry-run' => false,
'--verbose' => true,
'--env' => 'prod',
];
Expand All @@ -70,6 +64,10 @@ public function testCommandWrongType()
self::assertNotEquals(0, $returnValue, 'This command should failed: UNKNOWN TYPE');
}

/**
* @outputBuffering disabled
* @param array $options
*/
public function loadFixtures(array $options = [])
{
# Do not show output
Expand Down
Loading

0 comments on commit 0d64f71

Please sign in to comment.