From afdb2243f06d37e5861d6b442ffd194ed6f11fac Mon Sep 17 00:00:00 2001 From: ebrahimradi Date: Thu, 6 Apr 2023 12:06:38 +0200 Subject: [PATCH 1/2] Waiting for maintenance mode flag --- .../8-pause-consumer-if-maintenance-mode-is-active.md | 9 +++++++++ src/Consumers/Consumer.php | 4 ++++ 2 files changed, 13 insertions(+) create mode 100644 docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md diff --git a/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md b/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md new file mode 100644 index 00000000..29a5678e --- /dev/null +++ b/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md @@ -0,0 +1,9 @@ +--- +title: Pause consumer if maintenance mode is active +weight: 8 +--- + +Consumer pauses gracefully if application goes to maintenance mode. +So if application goes to maintenance mode by `php artisan down`, the consumer +will stop consuming after the current message gracefully. It will continue +to consume if maintenance mode is deactivated. diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index 8200d68e..477c4b1e 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -101,6 +101,10 @@ public function consume(): void } do { + while (app()->isDownForMaintenance()) { + $sleepTime = rand(1, 5); + sleep($sleepTime); + } $this->retryable->retry(fn () => $this->doConsume()); $this->checkForRestart(); } while (! $this->maxMessagesLimitReached() && ! $this->stopRequested); From 84242d2ad64e6229f8a161ccc6e6925faa6d7ac5 Mon Sep 17 00:00:00 2001 From: ebrahimradi Date: Fri, 7 Apr 2023 09:49:05 +0200 Subject: [PATCH 2/2] added before callbacks --- docs/advanced-usage/8-before-callbacks.md | 23 ++++++++++++ ...-consumer-if-maintenance-mode-is-active.md | 9 ----- src/Config/Config.php | 7 ++++ src/Consumers/Consumer.php | 8 ++-- src/Consumers/ConsumerBuilder.php | 9 +++++ tests/Consumers/ConsumerTest.php | 37 +++++++++++++++++++ 6 files changed, 81 insertions(+), 12 deletions(-) create mode 100644 docs/advanced-usage/8-before-callbacks.md delete mode 100644 docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md diff --git a/docs/advanced-usage/8-before-callbacks.md b/docs/advanced-usage/8-before-callbacks.md new file mode 100644 index 00000000..37872496 --- /dev/null +++ b/docs/advanced-usage/8-before-callbacks.md @@ -0,0 +1,23 @@ +--- +title: Before callbacks +weight: 8 +--- + +Before consuming any message, you can call any callbacks. For example to wait for +maintenance mode. Callbacks will receive consumer instance as an argument. +The callbacks get executed in the order they are defined: + +```php +$consumer = \Junges\Kafka\Facades\Kafka::createConsumer() + ->withBeforeConsuming(function($consumer) { + while (app()->isDownForMaintenance()) { + $sleepTimeInSeconds = random_int(1, 5); + sleep($sleepTimeInSeconds); + } + }); +``` + +You can add as many callback as you need, so you can divide different tasks into +different callbacks. + +If callbacks return `false` then consumer will go to stop phase. diff --git a/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md b/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md deleted file mode 100644 index 29a5678e..00000000 --- a/docs/advanced-usage/8-pause-consumer-if-maintenance-mode-is-active.md +++ /dev/null @@ -1,9 +0,0 @@ ---- -title: Pause consumer if maintenance mode is active -weight: 8 ---- - -Consumer pauses gracefully if application goes to maintenance mode. -So if application goes to maintenance mode by `php artisan down`, the consumer -will stop consuming after the current message gracefully. It will continue -to consume if maintenance mode is deactivated. diff --git a/src/Config/Config.php b/src/Config/Config.php index 5e6e10ce..80839d67 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -2,6 +2,7 @@ namespace Junges\Kafka\Config; +use Closure; use JetBrains\PhpStorm\Pure; use Junges\Kafka\Contracts\Consumer; use Junges\Kafka\Contracts\HandlesBatchConfiguration; @@ -80,6 +81,7 @@ public function __construct( private bool $stopAfterLastMessage = false, private int $restartInterval = 1000, private array $callbacks = [], + private array $beforeConsumings = [], ) { $this->batchConfig = $batchConfig ?? new NullBatchConfig(); } @@ -192,4 +194,9 @@ private function usingSasl(): bool && (strtoupper($this->securityProtocol) === static::SASL_PLAINTEXT || strtoupper($this->securityProtocol) === static::SASL_SSL); } + + public function getBeforeConsumings(): array + { + return $this->beforeConsumings; + } } diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index 477c4b1e..153f0b59 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -101,9 +101,11 @@ public function consume(): void } do { - while (app()->isDownForMaintenance()) { - $sleepTime = rand(1, 5); - sleep($sleepTime); + foreach ($this->config->getBeforeConsumings() as $beforeConsuming) { + $result = $beforeConsuming(...)(); + if ($result === false) { + break; + } } $this->retryable->retry(fn () => $this->doConsume()); $this->checkForRestart(); diff --git a/src/Consumers/ConsumerBuilder.php b/src/Consumers/ConsumerBuilder.php index be6add80..4de20340 100644 --- a/src/Consumers/ConsumerBuilder.php +++ b/src/Consumers/ConsumerBuilder.php @@ -40,6 +40,7 @@ class ConsumerBuilder implements ConsumerBuilderContract protected int $batchSizeLimit = 0; protected int $batchReleaseInterval = 0; protected bool $stopAfterLastMessage = false; + protected array $beforeConsumings = []; /** * @param string $brokers @@ -304,6 +305,13 @@ public function stopAfterLastMessage(bool $stopAfterLastMessage = true): self return $this; } + public function withBeforeConsuming(callable $callable): self + { + $this->beforeConsumings[] = $callable; + + return $this; + } + /** * @inheritDoc */ @@ -325,6 +333,7 @@ public function build(): CanConsumeMessages batchConfig: $this->getBatchConfig(), stopAfterLastMessage: $this->stopAfterLastMessage, callbacks: $this->callbacks, + beforeConsumings: $this->beforeConsumings, ); return new Consumer($config, $this->deserializer, $this->committerFactory); diff --git a/tests/Consumers/ConsumerTest.php b/tests/Consumers/ConsumerTest.php index e619d508..aad48919 100644 --- a/tests/Consumers/ConsumerTest.php +++ b/tests/Consumers/ConsumerTest.php @@ -257,4 +257,41 @@ function (KafkaConsumerMessage $message) { //finaly only one message should be consumed $this->assertEquals(1, $consumer->consumedMessagesCount()); } + + public function testItRunCallbacksBeforeConsume() + { + $fakeHandler = new FakeHandler(); + + $message = new Message(); + $message->err = 0; + $message->key = 'key'; + $message->topic_name = 'test-topic'; + $message->payload = '{"body": "message payload"}'; + $message->offset = 0; + $message->partition = 1; + $message->headers = []; + + $this->mockConsumerWithMessage($message); + + $this->mockProducer(); + + $config = new Config( + broker: 'broker', + topics: ['test-topic'], + securityProtocol: 'security', + commit: 1, + groupId: 'group', + consumer: $fakeHandler, + sasl: null, + dlq: null, + maxMessages: 1, + maxCommitRetries: 1, + beforeConsumings: [function(){var_dump('HERE');},function(){return false;},function(){var_dump('THERE');}] + ); + + $consumer = new Consumer($config, new JsonDeserializer()); + $consumer->consume(); + + $this->assertInstanceOf(ConsumedMessage::class, $fakeHandler->lastMessage()); + } }