Skip to content

Commit

Permalink
Improve consistency on encode/serialize terms
Browse files Browse the repository at this point in the history
  • Loading branch information
rtuin committed Nov 29, 2021
1 parent 340b675 commit b55772a
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 36 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Follow these docs to install this package and start using kafka with ease.
- [4.12 Setting kafka consumer configuration options](#setting-kafka-configuration-options)
- [4.13 Building the consumer](#building-the-consumer)
- [4.14 Consuming the kafka message](#consuming-the-kafka-messages)
- [5. Using custom encoders and decoders](#using-custom-encodersdecoders)
- [5. Using custom serializers/deserializers](#using-custom-serializersdeserializers)
- [6. Using `Kafka::fake()`method](#using-kafkafake)
- [6.1 `assertPublished` method](#assertpublished-method)
- [6.2 `assertPublishedOn` method](#assertpublishedon-method)
Expand Down Expand Up @@ -507,7 +507,8 @@ $consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->build();
$consumer->consume();
```

# Using custom encoders/decoders
<a name="using-custom-serializersdeserializers"></a>
# Using custom serializers/deserializers
Serialization is the process of converting messages to bytes. Deserialization is the inverse process - converting a stream of bytes into and object. In a nutshell,
it transforms the content into readable and interpretable information.
Basically, in order to prepare the message for transmission from the producer we use serializers. This package supports three serializers out of the box:
Expand Down Expand Up @@ -759,4 +760,3 @@ The Laravel Kafka package is open-sourced software licenced under the [MIT][mit]
[contributing]: .github/CONTRIBUTING.md
[license]: LICENSE
[mit]: https://opensource.org/licenses/MIT

10 changes: 5 additions & 5 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ class Consumer
private Committer $committer;
private Retryable $retryable;
private CommitterFactory $committerFactory;
private MessageDeserializer $decoder;
private MessageDeserializer $deserializer;
private bool $stopRequested = false;
private ?Closure $onStopConsume = null;

/**
* @param \Junges\Kafka\Config\Config $config
* @param MessageDeserializer $decoder
* @param MessageDeserializer $deserializer
*/
public function __construct(private Config $config, MessageDeserializer $decoder)
public function __construct(private Config $config, MessageDeserializer $deserializer)
{
$this->logger = app(Logger::class);
$this->messageCounter = new MessageCounter($config->getMaxMessages());
$this->retryable = new Retryable(new NativeSleeper(), 6, self::TIMEOUT_ERRORS);
$this->committerFactory = new CommitterFactory($this->messageCounter);
$this->decoder = $decoder;
$this->deserializer = $deserializer;
}

/**
Expand Down Expand Up @@ -157,7 +157,7 @@ private function executeMessage(Message $message): void
try {
$consumedMessage = $this->getConsumerMessage($message);

$this->config->getConsumer()->handle($this->decoder->deserialize($consumedMessage));
$this->config->getConsumer()->handle($this->deserializer->deserialize($consumedMessage));

$success = true;
} catch (Throwable $throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Junges\Kafka\Message\Deserializers\AvroDeserializer;
use Junges\Kafka\Tests\LaravelKafkaTestCase;

class AvroDecoderTest extends LaravelKafkaTestCase
class AvroDeserializerTest extends LaravelKafkaTestCase
{
public function testDeserializeTombstone()
{
Expand All @@ -24,9 +24,9 @@ public function testDeserializeTombstone()
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer->expects($this->never())->method('decodeMessage');

$decoder = new AvroDeserializer($registry, $recordSerializer);
$deserializer = new AvroDeserializer($registry, $recordSerializer);

$result = $decoder->deserialize($message);
$result = $deserializer->deserialize($message);

$this->assertInstanceOf(KafkaConsumerMessage::class, $result);
$this->assertNull($result->getBody());
Expand Down Expand Up @@ -63,9 +63,9 @@ public function testDeserializeWithSchema()
)
->willReturnOnConsecutiveCalls(['test'], 'decoded-key');

$decoder = new AvroDeserializer($registry, $recordSerializer);
$deserializer = new AvroDeserializer($registry, $recordSerializer);

$result = $decoder->deserialize($message);
$result = $deserializer->deserialize($message);

$this->assertInstanceOf(KafkaConsumerMessage::class, $result);
$this->assertSame(['test'], $result->getBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
use Junges\Kafka\Message\Deserializers\JsonDeserializer;
use Junges\Kafka\Tests\LaravelKafkaTestCase as TestCase;

class JsonDecoderTest extends TestCase
class JsonDeserializerTest extends TestCase
{
public function testDeserialize(): void
{
$message = $this->getMockForAbstractClass(KafkaConsumerMessage::class);
$message->expects($this->once())->method('getBody')->willReturn('{"name":"foo"}');
$decoder = new JsonDeserializer();
$result = $decoder->deserialize($message);
$deserializer = new JsonDeserializer();
$result = $deserializer->deserialize($message);

$this->assertInstanceOf(KafkaConsumerMessage::class, $result);
$this->assertEquals(['name' => 'foo'], $result->getBody());
Expand All @@ -26,10 +26,10 @@ public function testDeserializeNonJson(): void
{
$message = $this->getMockForAbstractClass(KafkaConsumerMessage::class);
$message->expects($this->once())->method('getBody')->willReturn('test');
$decoder = new JsonDeserializer();
$deserializer = new JsonDeserializer();

$this->expectException(\JsonException::class);

$decoder->deserialize($message);
$deserializer->deserialize($message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Junges\Kafka\Message\Serializers\AvroSerializer;
use Junges\Kafka\Tests\LaravelKafkaTestCase;

class AvroEncoderTest extends LaravelKafkaTestCase
class AvroSerializerTest extends LaravelKafkaTestCase
{
public function testSerializeTombstone()
{
Expand All @@ -24,9 +24,9 @@ public function testSerializeTombstone()
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer->expects($this->never())->method('encodeRecord');

$encoder = new AvroSerializer($registry, $recordSerializer);
$serializer = new AvroSerializer($registry, $recordSerializer);

$result = $encoder->serialize($producerMessage);
$result = $serializer->serialize($producerMessage);

$this->assertInstanceOf(KafkaProducerMessage::class, $result);
$this->assertSame($producerMessage, $result);
Expand Down Expand Up @@ -56,8 +56,8 @@ public function testSerializeWithoutSchemaDefinition()

$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();

$encoder = new AvroSerializer($registry, $recordSerializer);
$encoder->serialize($producerMessage);
$serializer = new AvroSerializer($registry, $recordSerializer);
$serializer->serialize($producerMessage);
}

public function testSerializeSuccessWithSchema()
Expand Down Expand Up @@ -91,9 +91,9 @@ public function testSerializeSuccessWithSchema()
[$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key']
)->willReturnOnConsecutiveCalls('encodedValue', 'encodedKey');

$encoder = new AvroSerializer($registry, $recordSerializer);
$serializer = new AvroSerializer($registry, $recordSerializer);

$this->assertSame($producerMessage, $encoder->serialize($producerMessage));
$this->assertSame($producerMessage, $serializer->serialize($producerMessage));
}

public function testSerializeKeyMode()
Expand Down Expand Up @@ -121,9 +121,9 @@ public function testSerializeKeyMode()
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer->expects($this->once())->method('encodeRecord')->with($avroSchema->getName(), $avroSchema->getDefinition(), 'test-key')->willReturn('encodedKey');

$encoder = new AvroSerializer($registry, $recordSerializer);
$serializer = new AvroSerializer($registry, $recordSerializer);

$this->assertSame($producerMessage, $encoder->serialize($producerMessage));
$this->assertSame($producerMessage, $serializer->serialize($producerMessage));
}

public function testSerializeBodyMode()
Expand Down Expand Up @@ -151,18 +151,18 @@ public function testSerializeBodyMode()
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer->expects($this->once())->method('encodeRecord')->with($avroSchema->getName(), $avroSchema->getDefinition(), [])->willReturn('encodedBody');

$encoder = new AvroSerializer($registry, $recordSerializer);
$serializer = new AvroSerializer($registry, $recordSerializer);

$this->assertSame($producerMessage, $encoder->serialize($producerMessage));
$this->assertSame($producerMessage, $serializer->serialize($producerMessage));
}

public function testGetRegistry()
{
$registry = $this->getMockForAbstractClass(AvroSchemaRegistry::class);

$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$encoder = new AvroSerializer($registry, $recordSerializer);
$serializer = new AvroSerializer($registry, $recordSerializer);

$this->assertSame($registry, $encoder->getRegistry());
$this->assertSame($registry, $serializer->getRegistry());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Junges\Kafka\Tests\LaravelKafkaTestCase as TestCase;

class JsonEncoderTest extends TestCase
class JsonSerializerTest extends TestCase
{
public function testSerialize()
{
$message = $this->getMockForAbstractClass(KafkaProducerMessage::class);
$message->expects($this->once())->method('getBody')->willReturn(['name' => 'foo']);
$message->expects($this->once())->method('withBody')->with('{"name":"foo"}')->willReturn($message);

$encoder = $this->getMockForAbstractClass(JsonSerializer::class);
$serializer = $this->getMockForAbstractClass(JsonSerializer::class);

$this->assertSame($message, $encoder->serialize($message));
$this->assertSame($message, $serializer->serialize($message));
}

public function testSerializeThrowsException(): void
{
$message = $this->getMockForAbstractClass(KafkaProducerMessage::class);
$message->expects($this->once())->method('getBody')->willReturn(chr(255));

$encoder = $this->getMockForAbstractClass(JsonSerializer::class);
$serializer = $this->getMockForAbstractClass(JsonSerializer::class);

$this->expectException(JsonException::class);

$encoder->serialize($message);
$serializer->serialize($message);
}
}

0 comments on commit b55772a

Please sign in to comment.