Skip to content

Commit

Permalink
Added validation to ensure consumer does not subscribe to same topic …
Browse files Browse the repository at this point in the history
…twice
  • Loading branch information
mateusjunges committed Oct 20, 2021
1 parent acb755f commit f1ab25c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
15 changes: 10 additions & 5 deletions src/Consumers/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ class ConsumerBuilder
* @param array $topics
* @param string|null $groupId
*/
private function __construct(string $brokers, array $topics, string $groupId = null)
private function __construct(string $brokers, array $topics = [], string $groupId = null)
{
foreach ($topics as $topic) {
$this->validateTopic($topic);
if (count($topics) > 0) {
foreach ($topics as $topic) {
$this->validateTopic($topic);
}
}

$this->brokers = $brokers;
$this->groupId = $groupId;
$this->topics = $topics;
$this->topics = array_unique($topics);

$this->commit = 1;
$this->handler = function () {
};

$this->maxMessages = -1;
$this->maxCommitRetries = 6;
$this->middlewares = [];
Expand Down Expand Up @@ -84,7 +87,9 @@ public function subscribe(...$topics): self
foreach ($topics as $topic) {
$this->validateTopic($topic);

$this->topics[] = $topic;
if (! collect($this->topics)->contains($topic)) {
$this->topics[] = $topic;
}
}

return $this;
Expand Down
11 changes: 11 additions & 0 deletions tests/Consumers/ConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ public function testItCanSubscribeToATopic()
$this->assertEquals(['foo'], $topics);
}

public function testItDoesNotSubscribeToATopicTwice()
{
$consumer = ConsumerBuilder::create('broker');

$consumer->subscribe('foo', 'foo');

$topics = $this->getPropertyWithReflection('topics', $consumer);

$this->assertEquals(['foo'], $topics);
}

public function testICanChangeDeserializersOnTheFly()
{
$consumer = ConsumerBuilder::create('broker');
Expand Down

0 comments on commit f1ab25c

Please sign in to comment.