Skip to content

Commit

Permalink
fixes for v1.3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Nov 25, 2021
1 parent 9a1fcba commit 06c3844
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 57 deletions.
77 changes: 34 additions & 43 deletions src/Support/Testing/Fakes/KafkaFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\CanPublishMessagesToKafka;
use Junges\Kafka\Contracts\KafkaProducerMessage;
use Junges\Kafka\Message\Message;
use PHPUnit\Framework\Assert as PHPUnit;

class KafkaFake implements CanPublishMessagesToKafka
{
private ProducerBuilderFake $producerBuilderFake;
private array $publishedMessages = [];

public function __construct()
{
$this->producerBuilderFake = new ProducerBuilderFake(
topic: '',
broker: ''
);

return $this->producerBuilderFake;
return $this->makeProducerBuilderFake();
}

/**
Expand All @@ -31,12 +28,7 @@ public function __construct()
*/
public function publishOn(string $topic, string $broker = null): ProducerBuilderFake
{
$this->producerBuilderFake = new ProducerBuilderFake(
topic: $topic,
broker: $broker
);

return $this->producerBuilderFake;
return $this->makeProducerBuilderFake($topic, $broker);
}

/**
Expand Down Expand Up @@ -78,23 +70,10 @@ public function assertPublishedTimes(int $times = 1, KafkaProducerMessage $messa
*/
public function assertPublishedOn(string $topic, KafkaProducerMessage $message = null, callable $callback = null)
{
if ($message === null) {
$this->assertPublished(null, function ($messageArray, $publishedTopic) use ($topic) {
return $topic === $publishedTopic;
});

return;
}

$this->assertPublished($message, function ($messageArray, $publishedTopic) use ($callback, $topic, $message) {
if ($publishedTopic !== $topic) {
return false;
}

return $callback
? $callback($message, $messageArray)
: true;
});
PHPUnit::assertTrue(
condition: $this->published($message, $callback)->count() > 0,
message: "The expected message was not published."
);
}

/**
Expand All @@ -107,15 +86,12 @@ public function assertPublishedOn(string $topic, KafkaProducerMessage $message =
*/
public function assertPublishedOnTimes(string $topic, int $times = 1, KafkaProducerMessage $message = null, callable $callback = null)
{
$this->assertPublishedTimes($times, $message, function ($messageArray, $publishedTopic) use ($callback, $topic, $message) {
if ($publishedTopic !== $topic) {
return false;
}
$count = $this->published($message, $callback)->count();

return $callback
? $callback($message, $messageArray)
: true;
});
PHPUnit::assertTrue(
condition: $count === $times,
message: "Kafka published {$count} messages instead of {$times}."
);
}

/**
Expand All @@ -126,6 +102,18 @@ public function assertNothingPublished()
PHPUnit::assertEmpty($this->getPublishedMessages(), 'Messages were published unexpectedly.');
}

private function makeProducerBuilderFake(string $topic = '', ?string $broker = null): ProducerBuilderFake
{
$this->producerBuilderFake = new ProducerBuilderFake(
topic: $topic,
broker: $broker
);

$this->producerBuilderFake->withProduceCallback(fn (Message $message) => $this->publishedMessages[] = $message);

return $this->producerBuilderFake;
}

/**
* Get all messages matching a truth-test callback.
*
Expand All @@ -139,13 +127,16 @@ private function published(KafkaProducerMessage $message = null, $callback = nul
return collect();
}

$callback = $callback ?: function () {
return true;
};
return collect($this->getPublishedMessages())->filter(function (Message $publishedMessage) use ($message, $callback) {
if ($callback !== null) {
$callback($publishedMessage);
}

if ($message !== null) {
return json_encode($publishedMessage->toArray()) === json_encode($message->toArray());
}

return collect($this->getPublishedMessages())->filter(function ($_, $topic) use ($message, $callback) {
return $callback($message, $topic);
return true;
});
}

Expand All @@ -168,6 +159,6 @@ private function hasPublished(): bool
#[Pure]
private function getPublishedMessages(): array
{
return $this->producerBuilderFake->getProducer()->getPublishedMessages();
return $this->publishedMessages;
}
}
30 changes: 24 additions & 6 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Support\Testing\Fakes;

use Closure;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Contracts\CanProduceMessages;
Expand All @@ -16,6 +17,7 @@ class ProducerBuilderFake implements CanProduceMessages
private ProducerFake $producerFake;
private MessageSerializer $serializer;
private ?Sasl $saslConfig = null;
private ?Closure $producerCallback = null;

public function __construct(
private string $topic,
Expand Down Expand Up @@ -46,6 +48,13 @@ public static function create(string $topic, string $broker = null): self
return new ProducerBuilderFake($broker, $topic);
}

public function withProduceCallback(callable $callback): self
{
$this->producerCallback = $callback;

return $this;
}

public function withConfigOption(string $name, string $option): self
{
$this->options[$name] = $option;
Expand Down Expand Up @@ -185,6 +194,20 @@ public function getProducer(): ProducerFake
return $this->producerFake;
}

private function makeProducer(Config $config): ProducerFake
{
$this->producerFake = app(ProducerFake::class, [
'config' => $config,
'topic' => $this->getTopic(),
]);

if ($this->producerCallback) {
$this->producerFake->withProduceCallback($this->producerCallback);
}

return $this->producerFake;
}

/**
* Build the producer.
*
Expand All @@ -199,11 +222,6 @@ private function build(): ProducerFake
customOptions: $this->options
);

$this->producerFake = app(ProducerFake::class, [
'config' => $conf,
'topic' => $this->topic,
]);

return $this->producerFake;
return $this->makeProducer($conf);
}
}
19 changes: 12 additions & 7 deletions src/Support/Testing/Fakes/ProducerFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

namespace Junges\Kafka\Support\Testing\Fakes;

use Closure;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Message\Message;
use RdKafka\Conf;

class ProducerFake
{
private array $messages = [];
private ?Closure $producerCallback = null;

public function __construct(
private Config $config,
Expand All @@ -21,15 +22,19 @@ public function setConf(array $options = []): Conf
return new Conf();
}

public function produce(Message $message): bool
public function withProduceCallback(callable $callback): self
{
$this->messages[$this->topic][] = json_encode($message->toArray());

return true;
$this->producerCallback = $callback;
return $this;
}

public function getPublishedMessages(): array
public function produce(Message $message): bool
{
return $this->messages;
if ($this->producerCallback !== null) {
$callback = $this->producerCallback;
$callback($message);
}

return true;
}
}
3 changes: 2 additions & 1 deletion tests/KafkaFakeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public function testAssertPublishedTimes()
->withBodyKey('test', ['test'])
->withHeaders(['custom' => 'header'])
->withKafkaKey(Str::uuid()->toString());

$producer->send();

$this->fake->assertPublishedTimes(1, $producer->getMessage());

try {
$this->fake->assertPublishedTimes(2, new Message('foo'));
$this->fake->assertPublishedTimes(2);
} catch (ExpectationFailedException $exception) {
$this->assertThat($exception, new ExceptionMessage('Kafka published 1 messages instead of 2.'));
}
Expand Down

0 comments on commit 06c3844

Please sign in to comment.