diff --git a/src/Producers/Producer.php b/src/Producers/Producer.php index 581eacde..3130811b 100644 --- a/src/Producers/Producer.php +++ b/src/Producers/Producer.php @@ -51,6 +51,8 @@ public function produce(KafkaProducerMessage $message): bool { $topic = $this->producer->newTopic($this->topic); + $message = clone $message; + $message = $this->serializer->serialize($message); if (method_exists($topic, 'producev')) { diff --git a/tests/LaravelKafkaTestCase.php b/tests/LaravelKafkaTestCase.php index 081e91be..796ec242 100644 --- a/tests/LaravelKafkaTestCase.php +++ b/tests/LaravelKafkaTestCase.php @@ -11,6 +11,7 @@ use RdKafka\KafkaConsumer; use RdKafka\Message; use RdKafka\Producer as KafkaProducer; +use RdKafka\Conf; class LaravelKafkaTestCase extends Orchestra { @@ -58,9 +59,24 @@ protected function mockProducer() return $mockedProducer->getMock(); }); + $this->mockKafkaProducer(); + } + protected function mockKafkaProducer() + { + // We have to get a topic object as a valid response for the mock + // We stub out this code here to achieve that + $conf = new Conf(); + $conf->set('log_level', 0); + $kafka = new KafkaProducer($conf); + $topic = $kafka->newTopic('test-topic'); + $mockedKafkaProducer = m::mock(KafkaProducer::class) ->shouldReceive('flush') ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) + ->shouldReceive('newTopic') + ->andReturn($topic) + ->shouldReceive('poll') + ->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR) ->getMock(); $this->app->bind(KafkaProducer::class, function () use ($mockedKafkaProducer) { diff --git a/tests/Producers/ProducerTest.php b/tests/Producers/ProducerTest.php new file mode 100644 index 00000000..243a7511 --- /dev/null +++ b/tests/Producers/ProducerTest.php @@ -0,0 +1,27 @@ +mockKafkaProducer(); + $producer = new Producer(new Config('broker', ['test-topic']), 'test-topic', new JsonSerializer()); + $payload = ['key' => 'value']; + + $message = new Message( + body: $payload, + ); + $producer->produce($message); + $producer->produce($message); + + $this->assertSame($payload, $message->getBody()); + } +} \ No newline at end of file