Skip to content

Commit

Permalink
[1.6.x] Fix serialize (#92)
Browse files Browse the repository at this point in the history
* 1.6 patch - Ensure json serialize does not double encode messages
* code clean
  • Loading branch information
lukecurtis93 authored Apr 7, 2022
1 parent eed46ab commit 7ef6099
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public function produce(KafkaProducerMessage $message): bool
{
$topic = $this->producer->newTopic($this->topic);

$message = clone $message;

$message = $this->serializer->serialize($message);

if (method_exists($topic, 'producev')) {
Expand Down
16 changes: 16 additions & 0 deletions tests/LaravelKafkaTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Producer as KafkaProducer;
use RdKafka\Conf;

class LaravelKafkaTestCase extends Orchestra
{
Expand Down Expand Up @@ -58,9 +59,24 @@ protected function mockProducer()
return $mockedProducer->getMock();
});

$this->mockKafkaProducer();
}
protected function mockKafkaProducer()
{
// We have to get a topic object as a valid response for the mock
// We stub out this code here to achieve that
$conf = new Conf();
$conf->set('log_level', 0);
$kafka = new KafkaProducer($conf);
$topic = $kafka->newTopic('test-topic');

$mockedKafkaProducer = m::mock(KafkaProducer::class)
->shouldReceive('flush')
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->shouldReceive('newTopic')
->andReturn($topic)
->shouldReceive('poll')
->andReturn(RD_KAFKA_RESP_ERR_NO_ERROR)
->getMock();

$this->app->bind(KafkaProducer::class, function () use ($mockedKafkaProducer) {
Expand Down
27 changes: 27 additions & 0 deletions tests/Producers/ProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Junges\Kafka\Tests\Producers;

use Junges\Kafka\Config\Config;
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Junges\Kafka\Producers\Producer;
use Junges\Kafka\Tests\LaravelKafkaTestCase;
use Junges\Kafka\Message\Message;

class ProducerTest extends LaravelKafkaTestCase
{
public function testItDoesNotDoubleSerializeMessageWhenUsingJsonSerializer()
{
$this->mockKafkaProducer();
$producer = new Producer(new Config('broker', ['test-topic']), 'test-topic', new JsonSerializer());
$payload = ['key' => 'value'];

$message = new Message(
body: $payload,
);
$producer->produce($message);
$producer->produce($message);

$this->assertSame($payload, $message->getBody());
}
}

0 comments on commit 7ef6099

Please sign in to comment.