Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defining before callbacks #192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/advanced-usage/8-before-callbacks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
title: Before callbacks
weight: 8
---

Before consuming any message, you can call any callbacks. For example to wait for
maintenance mode. Callbacks will receive consumer instance as an argument.
The callbacks get executed in the order they are defined:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
->withBeforeConsuming(function($consumer) {
while (app()->isDownForMaintenance()) {
$sleepTimeInSeconds = random_int(1, 5);
sleep($sleepTimeInSeconds);
}
});
```

You can add as many callback as you need, so you can divide different tasks into
different callbacks.

If callbacks return `false` then consumer will go to stop phase.
9 changes: 8 additions & 1 deletion src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Config;

use Closure;
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
Expand Down Expand Up @@ -77,7 +78,8 @@ public function __construct(
private readonly HandlesBatchConfiguration $batchConfig = new NullBatchConfig(),
private readonly bool $stopAfterLastMessage = false,
private readonly int $restartInterval = 1000,
private readonly array $callbacks = []
private readonly array $callbacks = [],
private array $beforeConsumings = [],
) {}

public function getCommit(): int
Expand Down Expand Up @@ -188,4 +190,9 @@ private function usingSasl(): bool
&& (strtoupper($this->securityProtocol) === static::SASL_PLAINTEXT
|| strtoupper($this->securityProtocol) === static::SASL_SSL);
}

public function getBeforeConsumings(): array
{
return $this->beforeConsumings;
}
}
6 changes: 6 additions & 0 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public function consume(): void
}

do {
foreach ($this->config->getBeforeConsumings() as $beforeConsuming) {
$result = $beforeConsuming(...)();
if ($result === false) {
break;
}
}
$this->retryable->retry(fn () => $this->doConsume());
$this->checkForRestart();
} while (! $this->maxMessagesLimitReached() && ! $this->stopRequested);
Expand Down
9 changes: 9 additions & 0 deletions src/Consumers/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ConsumerBuilder implements ConsumerBuilderContract
protected int $batchSizeLimit = 0;
protected int $batchReleaseInterval = 0;
protected bool $stopAfterLastMessage = false;
protected array $beforeConsumings = [];

protected function __construct(protected string $brokers, array $topics = [], protected ?string $groupId = null)
{
Expand Down Expand Up @@ -257,6 +258,13 @@ public function stopAfterLastMessage(bool $stopAfterLastMessage = true): self
return $this;
}

public function withBeforeConsuming(callable $callable): self
{
$this->beforeConsumings[] = $callable;

return $this;
}

/** @inheritDoc */
public function build(): MessageConsumer
{
Expand All @@ -276,6 +284,7 @@ public function build(): MessageConsumer
batchConfig: $this->getBatchConfig(),
stopAfterLastMessage: $this->stopAfterLastMessage,
callbacks: $this->callbacks,
beforeConsumings: $this->beforeConsumings,
);

return new Consumer($config, $this->deserializer, $this->committerFactory);
Expand Down
37 changes: 37 additions & 0 deletions tests/Consumers/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,41 @@ function (ConsumerMessage $message) {
//finaly only one message should be consumed
$this->assertEquals(1, $consumer->consumedMessagesCount());
}

public function testItRunCallbacksBeforeConsume()
{
$fakeHandler = new FakeHandler();

$message = new Message();
$message->err = 0;
$message->key = 'key';
$message->topic_name = 'test-topic';
$message->payload = '{"body": "message payload"}';
$message->offset = 0;
$message->partition = 1;
$message->headers = [];

$this->mockConsumerWithMessage($message);

$this->mockProducer();

$config = new Config(
broker: 'broker',
topics: ['test-topic'],
securityProtocol: 'security',
commit: 1,
groupId: 'group',
consumer: $fakeHandler,
sasl: null,
dlq: null,
maxMessages: 1,
maxCommitRetries: 1,
beforeConsumings: [function(){var_dump('HERE');},function(){return false;},function(){var_dump('THERE');}]
);

$consumer = new Consumer($config, new JsonDeserializer());
$consumer->consume();

$this->assertInstanceOf(ConsumedMessage::class, $fakeHandler->lastMessage());
}
}