Skip to content

Commit

Permalink
Merge pull request #38728 from phillip-kruger/encoding
Browse files Browse the repository at this point in the history
Encode Kafka messages with UTF8
  • Loading branch information
phillip-kruger authored Feb 12, 2024
2 parents 99e8d6d + 88409ed commit 7fd19a8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ private void assertRequestedPartitionsExist(String topicName, Collection<Integer
}

public void createMessage(KafkaMessageCreateRequest request) {
var record = new ProducerRecord<>(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()),
Bytes.wrap(request.getValue().getBytes()));
var record = new ProducerRecord<>(request.getTopic(), request.getPartition(),
Bytes.wrap(request.getKey().getBytes(StandardCharsets.UTF_8)),
Bytes.wrap(request.getValue().getBytes(StandardCharsets.UTF_8)));

Optional.ofNullable(request.getHeaders())
.orElseGet(Collections::emptyMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ public KafkaMessage convert(ConsumerRecord<Bytes, Bytes> message) {
message.partition(),
message.offset(),
message.timestamp(),
Optional.ofNullable(message.key()).map(Bytes::toString).orElse(null),
Optional.ofNullable(message.value()).map(Bytes::toString).orElse(null),
Optional.ofNullable(message.key()).map((t) -> {
return new String(t.get(), StandardCharsets.UTF_8);
}).orElse(null),
Optional.ofNullable(message.value()).map((t) -> {
return new String(t.get(), StandardCharsets.UTF_8);
}).orElse(null),
headers(message));
}

Expand Down

0 comments on commit 7fd19a8

Please sign in to comment.