From 81edd04488c95793ad0743828f6fb4d6d26adbd0 Mon Sep 17 00:00:00 2001 From: ozangunalp Date: Thu, 22 Dec 2022 11:12:39 +0000 Subject: [PATCH] Kafka ui RPC creates own ObjectMapper (deserialization) Fixes #29976 (cherry picked from commit b2ac6b2d01c995836bc9be8d4ee2801ab5f53742) --- .../client/runtime/ui/KafkaUiHandler.java | 12 +++++------ .../kafka/client/runtime/ui/KafkaUiUtils.java | 21 ++++++++++++++++++- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java index 8b7d916de765b..06aea79b5b9f5 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java @@ -24,7 +24,8 @@ public void handlePost(RoutingContext event) { endResponse(event, BAD_REQUEST, "Request body is null"); return; } - var body = event.body().asJsonObject(); + var webUtils = kafkaWebUiUtils(); + var body = webUtils.fromJson(event.body().buffer()); if (body == null) { endResponse(event, BAD_REQUEST, "Request JSON body is null"); return; @@ -34,7 +35,6 @@ public void handlePost(RoutingContext event) { var message = "OK"; var error = ""; - var webUtils = kafkaWebUiUtils(); var adminClient = kafkaAdminClient(); boolean res = false; @@ -50,7 +50,7 @@ public void handlePost(RoutingContext event) { res = true; break; case "createTopic": - var topicCreateRq = event.body().asPojo(KafkaCreateTopicRequest.class); + var topicCreateRq = webUtils.fromJson(event.body().buffer(), KafkaCreateTopicRequest.class); res = adminClient.createTopic(topicCreateRq); message = webUtils.toJson(webUtils.getTopics()); break; @@ -64,17 +64,17 @@ public void handlePost(RoutingContext event) { res = true; break; case "topicMessages": - var msgRequest = event.body().asPojo(KafkaMessagesRequest.class); + var msgRequest = webUtils.fromJson(event.body().buffer(), KafkaMessagesRequest.class); message = webUtils.toJson(webUtils.getMessages(msgRequest)); res = true; break; case "getOffset": - var request = event.body().asPojo(KafkaOffsetRequest.class); + var request = webUtils.fromJson(event.body().buffer(), KafkaOffsetRequest.class); message = webUtils.toJson(webUtils.getOffset(request)); res = true; break; case "createMessage": - var rq = event.body().asPojo(KafkaMessageCreateRequest.class); + var rq = webUtils.fromJson(event.body().buffer(), KafkaMessageCreateRequest.class); webUtils.createMessage(rq); message = "{}"; res = true; diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java index bc9b3ba525dbd..6cca91a20f37e 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java @@ -2,6 +2,8 @@ import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -21,12 +23,14 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.json.JsonMapper; +import io.netty.buffer.ByteBufInputStream; import io.quarkus.kafka.client.runtime.KafkaAdminClient; import io.quarkus.kafka.client.runtime.ui.model.Order; import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; @@ -34,6 +38,8 @@ import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest; import io.quarkus.kafka.client.runtime.ui.model.response.*; import io.smallrye.common.annotation.Identifier; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; @Singleton public class KafkaUiUtils { @@ -229,9 +235,22 @@ public String toJson(Object o) { try { res = objectMapper.writeValueAsString(o); } catch (JsonProcessingException ex) { - //FIXME: res = ""; } return res; } + + public JsonObject fromJson(Buffer buffer) { + return new JsonObject(fromJson(buffer, Map.class)); + } + + public T fromJson(Buffer buffer, Class type) { + try { + JsonParser parser = objectMapper.createParser((InputStream) new ByteBufInputStream(buffer.getByteBuf())); + return objectMapper.readValue(parser, type); + } catch (IOException e) { + return null; + } + } + }