Skip to content

Commit

Permalink
merge master in current branch
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasVHG committed Oct 25, 2023
2 parents 054a5e0 + cc7f05c commit 1a04a64
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
3 changes: 2 additions & 1 deletion app/ConsoleApplication.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ protected function registerCommands()
$consoleApp->add(new CacheClearCommand());

$disableDelay = $this['config']['rabbitmq']['disable_delay'] ?? false;
$consoleApp->add(new ConsumeCommand('projectaanvraag:consumer', 'rabbit.connection', 'rabbit.consumer', $disableDelay));
$exchange = $this['config']['rabbitmq']['exchange'];
$consoleApp->add(new ConsumeCommand('projectaanvraag:consumer', 'rabbit.connection', 'rabbit.consumer', $exchange, $disableDelay));

// Sync culturefeed consumers with local DB
$consoleApp->add(new SyncConsumersCommand());
Expand Down
6 changes: 3 additions & 3 deletions app/Core/MessageBusProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public function register(Container $pimple)
$producer->setExchangeOptions(
[
'declare' => true,
'name' => 'main_exchange',
'name' => $pimple['config']['rabbitmq']['exchange'],
'type' => 'x-delayed-message',
'durable' => true,
'arguments' => new AMQPTable(
Expand All @@ -134,7 +134,7 @@ public function register(Container $pimple)
$producer->setExchangeOptions(
[
'declare' => true,
'name' => 'main_exchange',
'name' => $pimple['config']['rabbitmq']['exchange'],
'type' => 'topic',
'durable' => true,
]
Expand Down Expand Up @@ -169,7 +169,7 @@ public function register(Container $pimple)
)
);

$channel->queue_bind('projectaanvraag_failed', 'main_exchange', 'projectaanvraag_failed');
$channel->queue_bind('projectaanvraag_failed', $pimple['config']['rabbitmq']['exchange'], 'projectaanvraag_failed');

// Resolvers.
$routingKeyResolver = new AsyncCommandRoutingKeyResolver();
Expand Down
15 changes: 11 additions & 4 deletions src/Console/Command/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class ConsumeCommand extends Command
*/
protected $connectionId;

/**
* @var string
*/
protected $exchange;

/**
* @var boolean
*/
Expand All @@ -37,13 +42,15 @@ class ConsumeCommand extends Command
* @param null|string $name
* @param $connectionId
* @param $consumerId
* @param $exchange
* @param bool $disableDelay
*/
public function __construct($name, $connectionId, $consumerId, $disableDelay = false)
public function __construct($name, $connectionId, $consumerId, $exchange, $disableDelay = false)
{
parent::__construct($name);
$this->connectionId = $connectionId;
$this->consumerId = $consumerId;
$this->exchange = $exchange;
$this->disableDelay = $disableDelay;
}

Expand Down Expand Up @@ -81,16 +88,16 @@ protected function execute(InputInterface $input, OutputInterface $output)

// Declare the exchange
if (!$this->disableDelay) {
$channel->exchange_declare('main_exchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
$channel->exchange_declare($this->exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
} else {
$channel->exchange_declare('main_exchange', 'topic', false, true, false);
$channel->exchange_declare($this->exchange, 'topic', false, true, false);
}

// Declare the main queue
$channel->queue_declare('projectaanvraag', false, true, false, false, false, new AMQPTable(['routing_keys' => ['asynchronous_commands']]));

// Bind the queue to the async_commands exchange
$channel->queue_bind('projectaanvraag', 'main_exchange', 'asynchronous_commands');
$channel->queue_bind('projectaanvraag', $this->exchange, 'asynchronous_commands');

$output->writeln(' [*] Waiting for messages. To exit press CTRL+C');

Expand Down

0 comments on commit 1a04a64

Please sign in to comment.