From 5d02dd4a5104264214f8268c6cd54c9e66a14643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A1stor=20Rodr=C3=ADguez?= Date: Mon, 4 Jan 2021 16:11:10 +0100 Subject: [PATCH] Refactor in preparation for #88 support of response-level offset --- ...lectorsUtils.java => CollectionUtils.java} | 10 +- .../connect/http/ack/ConfirmationWindow.java | 2 +- .../JacksonKvRecordHttpResponseParser.java | 24 +-- ...cksonKvRecordHttpResponseParserConfig.java | 7 +- .../response/jackson/JacksonRecordParser.java | 53 +++--- .../jackson/JacksonResponseRecordParser.java | 84 ++++++++++ ...tyResolver.java => JacksonSerializer.java} | 27 +++- .../response/jackson/model/JacksonRecord.java | 53 ++++++ .../connect/common/CollectionUtilsTest.java | 62 +++++++ ...nKvRecordHttpResponseParserConfigTest.java | 10 +- ...JacksonKvRecordHttpResponseParserTest.java | 57 +++---- .../jackson/JacksonRecordParserTest.java | 84 ++++------ .../JacksonResponseRecordParserTest.java | 153 ++++++++++++++++++ ...erTest.java => JacksonSerializerTest.java} | 56 +++++-- 14 files changed, 520 insertions(+), 162 deletions(-) rename kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/{CollectorsUtils.java => CollectionUtils.java} (81%) create mode 100644 kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java rename kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/{JacksonPropertyResolver.java => JacksonSerializer.java} (72%) create mode 100644 kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/model/JacksonRecord.java create mode 100644 kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/common/CollectionUtilsTest.java create mode 100644 kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParserTest.java rename kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/{JacksonPropertyResolverTest.java => JacksonSerializerTest.java} (55%) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectorsUtils.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectionUtils.java similarity index 81% rename from kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectorsUtils.java rename to kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectionUtils.java index 833779cd..576b7454 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectorsUtils.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/common/CollectionUtils.java @@ -22,14 +22,16 @@ import lombok.experimental.UtilityClass; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collector; import static java.util.stream.Collectors.toMap; @UtilityClass -public class CollectorsUtils { +public class CollectionUtils { public static Collector> toLinkedHashMap( Function keyMapper, @@ -44,4 +46,10 @@ public class CollectorsUtils { LinkedHashMap::new ); } + + public static Map merge(Map mapA, Map mapB) { + Map merged = new HashMap<>(mapA); + mapB.forEach((key, value) -> merged.merge(key, value, (k, v) -> v)); + return merged; + } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/ack/ConfirmationWindow.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/ack/ConfirmationWindow.java index 69bad39d..b0a05f08 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/ack/ConfirmationWindow.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/ack/ConfirmationWindow.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Optional; -import static com.github.castorm.kafka.connect.common.CollectorsUtils.toLinkedHashMap; +import static com.github.castorm.kafka.connect.common.CollectionUtils.toLinkedHashMap; import static java.util.function.Function.identity; @Slf4j diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java index 53ccf522..ddb00e76 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java @@ -20,10 +20,10 @@ * #L% */ -import com.fasterxml.jackson.databind.JsonNode; import com.github.castorm.kafka.connect.http.model.HttpResponse; import com.github.castorm.kafka.connect.http.model.Offset; import com.github.castorm.kafka.connect.http.record.model.KvRecord; +import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord; import com.github.castorm.kafka.connect.http.response.spi.KvRecordHttpResponseParser; import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser; import lombok.RequiredArgsConstructor; @@ -43,7 +43,7 @@ public class JacksonKvRecordHttpResponseParser implements KvRecordHttpResponsePa private final Function, JacksonKvRecordHttpResponseParserConfig> configFactory; - private JacksonRecordParser recordParser; + private JacksonResponseRecordParser responseParser; private TimestampParser timestampParser; @@ -54,27 +54,27 @@ public JacksonKvRecordHttpResponseParser() { @Override public void configure(Map configs) { JacksonKvRecordHttpResponseParserConfig config = configFactory.apply(configs); - recordParser = config.getRecordParser(); + responseParser = config.getResponseParser(); timestampParser = config.getTimestampParser(); } @Override public List parse(HttpResponse response) { - return recordParser.getRecords(response.getBody()) + return responseParser.getRecords(response.getBody()) .map(this::map) .collect(toList()); } - private KvRecord map(JsonNode node) { + private KvRecord map(JacksonRecord record) { - Map offsets = recordParser.getOffsets(node); + Map offsets = record.getOffset(); - String key = recordParser.getKey(node) + String key = ofNullable(record.getKey()) .map(Optional::of) .orElseGet(() -> ofNullable(offsets.get("key")).map(String.class::cast)) - .orElseGet(() -> generateConsistentKey(node)); + .orElseGet(() -> generateConsistentKey(record.getBody())); - Optional timestamp = recordParser.getTimestamp(node) + Optional timestamp = ofNullable(record.getTimestamp()) .map(Optional::of) .orElseGet(() -> ofNullable(offsets.get("timestamp")).map(String.class::cast)) .map(timestampParser::parse); @@ -85,12 +85,12 @@ private KvRecord map(JsonNode node) { return KvRecord.builder() .key(key) - .value(recordParser.getValue(node)) + .value(record.getBody()) .offset(offset) .build(); } - private String generateConsistentKey(JsonNode node) { - return nameUUIDFromBytes(node.toString().getBytes()).toString(); + private static String generateConsistentKey(String body) { + return nameUUIDFromBytes(body.getBytes()).toString(); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfig.java index ade4b32a..1db325b5 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfig.java @@ -36,13 +36,16 @@ public class JacksonKvRecordHttpResponseParserConfig extends AbstractConfig { private static final String RECORD_TIMESTAMP_PARSER_CLASS = "http.response.record.timestamp.parser"; - private final JacksonRecordParser recordParser; + private final JacksonResponseRecordParser responseParser; private final TimestampParser timestampParser; JacksonKvRecordHttpResponseParserConfig(Map originals) { super(config(), originals); - recordParser = new JacksonRecordParser(); + JacksonSerializer serializer = new JacksonSerializer(); + JacksonRecordParser recordParser = new JacksonRecordParser(serializer); recordParser.configure(originals); + responseParser = new JacksonResponseRecordParser(recordParser, serializer); + responseParser.configure(originals); timestampParser = getConfiguredInstance(RECORD_TIMESTAMP_PARSER_CLASS, TimestampParser.class); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java index e5f89afb..46646a40 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java @@ -24,16 +24,13 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import org.apache.kafka.common.Configurable; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.function.Function; -import java.util.stream.Stream; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toMap; @@ -43,73 +40,59 @@ public class JacksonRecordParser implements Configurable { private final Function, JacksonRecordParserConfig> configFactory; - private final ObjectMapper objectMapper; + private final JacksonSerializer serializer; - private final JacksonPropertyResolver propertyResolver; - - private JsonPointer recordsPointer; private List keyPointer; private Optional timestampPointer; private Map offsetPointers; private JsonPointer valuePointer; public JacksonRecordParser() { - this(JacksonRecordParserConfig::new, new ObjectMapper(), new JacksonPropertyResolver()); + this(new JacksonSerializer(new ObjectMapper())); + } + + public JacksonRecordParser(JacksonSerializer serializer) { + this(JacksonRecordParserConfig::new, serializer); } @Override public void configure(Map settings) { JacksonRecordParserConfig config = configFactory.apply(settings); - recordsPointer = config.getRecordsPointer(); keyPointer = config.getKeyPointer(); valuePointer = config.getValuePointer(); offsetPointers = config.getOffsetPointers(); timestampPointer = config.getTimestampPointer(); } - Stream getRecords(byte[] body) { - return propertyResolver.getArrayAt(deserialize(body), recordsPointer); - } - - @Deprecated - /* - Replaced by Offset + /** + * @deprecated Replaced by Offset */ + @Deprecated Optional getKey(JsonNode node) { String key = keyPointer.stream() - .map(pointer -> propertyResolver.getObjectAt(node, pointer).asText()) + .map(pointer -> serializer.getObjectAt(node, pointer).asText()) .filter(it -> !it.isEmpty()) .collect(joining("+")); return key.isEmpty() ? Optional.empty() : Optional.of(key); } - @Deprecated - /* - Replaced by Offset + /** + * @deprecated Replaced by Offset */ + @Deprecated Optional getTimestamp(JsonNode node) { - return timestampPointer.map(pointer -> propertyResolver.getObjectAt(node, pointer).asText()); + return timestampPointer.map(pointer -> serializer.getObjectAt(node, pointer).asText()); } - Map getOffsets(JsonNode node) { + Map getOffset(JsonNode node) { return offsetPointers.entrySet().stream() - .collect(toMap(Entry::getKey, entry -> propertyResolver.getObjectAt(node, entry.getValue()).asText())); + .collect(toMap(Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); } String getValue(JsonNode node) { - JsonNode value = propertyResolver.getObjectAt(node, valuePointer); - - return value.isObject() ? serialize(value) : value.asText(); - } - - @SneakyThrows(IOException.class) - private JsonNode deserialize(byte[] body) { - return objectMapper.readTree(body); - } + JsonNode value = serializer.getObjectAt(node, valuePointer); - @SneakyThrows(IOException.class) - private String serialize(JsonNode node) { - return objectMapper.writeValueAsString(node); + return value.isObject() ? serializer.serialize(value) : value.asText(); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java new file mode 100644 index 00000000..6caff747 --- /dev/null +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -0,0 +1,84 @@ +package com.github.castorm.kafka.connect.http.response.jackson; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.Configurable; + +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Stream; + +import static com.github.castorm.kafka.connect.common.CollectionUtils.merge; +import static java.util.Collections.emptyMap; + +@RequiredArgsConstructor +public class JacksonResponseRecordParser implements Configurable { + + private final Function, JacksonRecordParserConfig> configFactory; + + private final JacksonRecordParser recordParser; + + private final JacksonSerializer serializer; + + private JsonPointer recordsPointer; + + public JacksonResponseRecordParser() { + this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper())); + } + + public JacksonResponseRecordParser(JacksonRecordParser recordParser, JacksonSerializer serializer) { + this(JacksonRecordParserConfig::new, recordParser, serializer); + } + + @Override + public void configure(Map settings) { + JacksonRecordParserConfig config = configFactory.apply(settings); + recordsPointer = config.getRecordsPointer(); + } + + Stream getRecords(byte[] body) { + + JsonNode jsonBody = serializer.deserialize(body); + + Map responseOffset = getResponseOffset(jsonBody); + + return serializer.getArrayAt(jsonBody, recordsPointer) + .map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset)); + } + + private Map getResponseOffset(JsonNode node) { + return emptyMap(); + } + + private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map responseOffset) { + return JacksonRecord.builder() + .key(recordParser.getKey(jsonRecord).orElse(null)) + .timestamp(recordParser.getTimestamp(jsonRecord).orElse(null)) + .offset(merge(responseOffset, recordParser.getOffset(jsonRecord))) + .body(recordParser.getValue(jsonRecord)) + .build(); + } +} diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolver.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java similarity index 72% rename from kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolver.java rename to kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java index fc8706bd..f99bb3ba 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolver.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,16 +22,37 @@ import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import java.io.IOException; import java.util.stream.Stream; import static com.fasterxml.jackson.core.JsonPointer.compile; import static java.util.stream.StreamSupport.stream; -class JacksonPropertyResolver { +@RequiredArgsConstructor +class JacksonSerializer { private static final JsonPointer JSON_ROOT = compile("/"); + private final ObjectMapper objectMapper; + + public JacksonSerializer() { + this(new ObjectMapper()); + } + + @SneakyThrows(IOException.class) + JsonNode deserialize(byte[] body) { + return objectMapper.readTree(body); + } + + @SneakyThrows(IOException.class) + String serialize(JsonNode node) { + return objectMapper.writeValueAsString(node); + } + JsonNode getObjectAt(JsonNode node, JsonPointer pointer) { return getRequiredAt(node, pointer); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/model/JacksonRecord.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/model/JacksonRecord.java new file mode 100644 index 00000000..fdfb1ff7 --- /dev/null +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/model/JacksonRecord.java @@ -0,0 +1,53 @@ +package com.github.castorm.kafka.connect.http.response.jackson.model; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 - 2021 Cástor Rodríguez + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import lombok.Builder; +import lombok.Builder.Default; +import lombok.Value; +import lombok.With; + +import java.util.Map; + +import static java.util.Collections.emptyMap; + +@With +@Value +@Builder +public class JacksonRecord { + + /** + * @deprecated To be integrated in offset + */ + @Deprecated + String key; + + /** + * @deprecated To be integrated in offset + */ + @Deprecated + String timestamp; + + @Default + Map offset = emptyMap(); + + String body; +} diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/common/CollectionUtilsTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/common/CollectionUtilsTest.java new file mode 100644 index 00000000..6145ddba --- /dev/null +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/common/CollectionUtilsTest.java @@ -0,0 +1,62 @@ +package com.github.castorm.kafka.connect.common; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 - 2021 Cástor Rodríguez + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import static com.github.castorm.kafka.connect.common.CollectionUtils.merge; +import static com.github.castorm.kafka.connect.common.CollectionUtilsTest.Fixture.map1; +import static com.github.castorm.kafka.connect.common.CollectionUtilsTest.Fixture.map1and3; +import static com.github.castorm.kafka.connect.common.CollectionUtilsTest.Fixture.map2; +import static com.github.castorm.kafka.connect.common.CollectionUtilsTest.Fixture.map3; +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; + +class CollectionUtilsTest { + + @Test + void whenMergeMapWithEmpty_thenMap() { + assertThat(merge(map1, emptyMap())).isEqualTo(map1); + } + + @Test + void whenMergeEmptyWithMap_thenMap() { + assertThat(merge(emptyMap(), map1)).isEqualTo(map1); + } + + @Test + void whenMergeMapSameKey_thenLastValueWins() { + assertThat(merge(map1, map2)).isEqualTo(map2); + } + + @Test + void whenMergeMapDiffKey_thenCombined() { + assertThat(merge(map1, map3)).isEqualTo(map1and3); + } + + interface Fixture { + ImmutableMap map1 = ImmutableMap.of("k1", "v1"); + ImmutableMap map2 = ImmutableMap.of("k1", "v2"); + ImmutableMap map3 = ImmutableMap.of("k2", "v3"); + ImmutableMap map1and3 = ImmutableMap.of("k1", "v1", "k2", "v3"); + } +} diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfigTest.java index 574ccf9c..f1b96661 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfigTest.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,13 +35,13 @@ class JacksonKvRecordHttpResponseParserConfigTest { @Test void whenItemsParserClassConfigured_thenInitialized() { - assertThat(config(ImmutableMap.of("http.response.record.parser", "com.github.castorm.kafka.connect.http.response.jackson.")).getRecordParser()) - .isInstanceOf(JacksonRecordParser.class); + assertThat(config(ImmutableMap.of("http.response.record.parser", "com.github.castorm.kafka.connect.http.response.jackson.")).getResponseParser()) + .isInstanceOf(JacksonResponseRecordParser.class); } @Test void whenMissingItemParserClassConfigured_thenInitialized() { - assertThat(config(emptyMap()).getRecordParser()).isInstanceOf(JacksonRecordParser.class); + assertThat(config(emptyMap()).getResponseParser()).isInstanceOf(JacksonResponseRecordParser.class); } @Test diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserTest.java index 2c47d7bf..92deb12b 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserTest.java @@ -20,10 +20,10 @@ * #L% */ -import com.fasterxml.jackson.databind.JsonNode; import com.github.castorm.kafka.connect.http.model.HttpResponse; import com.github.castorm.kafka.connect.http.model.Offset; import com.github.castorm.kafka.connect.http.record.model.KvRecord; +import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord; import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; @@ -37,6 +37,7 @@ import java.util.stream.Stream; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParserTest.Fixture.bytes; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParserTest.Fixture.record; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParserTest.Fixture.response; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParserTest.Fixture.timestamp; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParserTest.Fixture.timestampIso; @@ -48,7 +49,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.lenient; @ExtendWith(MockitoExtension.class) class JacksonKvRecordHttpResponseParserTest { @@ -59,20 +59,16 @@ class JacksonKvRecordHttpResponseParserTest { JacksonKvRecordHttpResponseParserConfig config; @Mock - JacksonRecordParser recordParser; + JacksonResponseRecordParser responseParser; @Mock TimestampParser timestampParser; - @Mock - JsonNode record; - @BeforeEach void setUp() { parser = new JacksonKvRecordHttpResponseParser(__ -> config); - given(config.getRecordParser()).willReturn(recordParser); + given(config.getResponseParser()).willReturn(responseParser); given(config.getTimestampParser()).willReturn(timestampParser); - lenient().when(record.toString()).thenReturn("Binary Value"); parser.configure(emptyMap()); } @@ -87,8 +83,7 @@ void givenNoItems_thenEmpty() { @Test void givenOneItem_thenKeyMapped() { - givenRecords(Stream.of(record)); - given(recordParser.getKey(record)).willReturn(Optional.of("key")); + givenRecords(Stream.of(record.withKey("key"))); assertThat(parser.parse(response)).first().extracting(KvRecord::getKey).isEqualTo("key"); } @@ -96,9 +91,7 @@ void givenOneItem_thenKeyMapped() { @Test void givenOneItemWithNoKeyButOffset_thenKeyMappedFromOffset() { - givenRecords(Stream.of(record)); - given(recordParser.getKey(record)).willReturn(Optional.empty()); - given(recordParser.getOffsets(record)).willReturn(ImmutableMap.of("key", "value")); + givenRecords(Stream.of(record.withKey(null).withOffset(ImmutableMap.of("key", "value")))); assertThat(parser.parse(response)).first().extracting(KvRecord::getKey).isEqualTo("value"); } @@ -106,8 +99,7 @@ void givenOneItemWithNoKeyButOffset_thenKeyMappedFromOffset() { @Test void givenOneItemWithNoNoKey_thenKeyDefault() { - givenRecords(Stream.of(record)); - given(recordParser.getKey(record)).willReturn(Optional.empty()); + givenRecords(Stream.of(record.withKey(null))); assertThat(parser.parse(response)).first().extracting(KvRecord::getKey).isNotNull(); } @@ -115,8 +107,7 @@ void givenOneItemWithNoNoKey_thenKeyDefault() { @Test void givenOneItem_thenValueMapped() { - givenRecords(Stream.of(record)); - given(recordParser.getValue(record)).willReturn("value"); + givenRecords(Stream.of(record.withBody("value"))); assertThat(parser.parse(response)).first().extracting(KvRecord::getValue).isEqualTo("value"); } @@ -124,8 +115,7 @@ void givenOneItem_thenValueMapped() { @Test void givenOneItem_thenTimestampMapped() { - givenRecords(Stream.of(record)); - given(recordParser.getTimestamp(record)).willReturn(Optional.of(timestampIso)); + givenRecords(Stream.of(record.withTimestamp(timestampIso))); given(timestampParser.parse(timestampIso)).willReturn(timestamp); assertThat(parser.parse(response)).first().extracting(KvRecord::getOffset).extracting(Offset::getTimestamp).isEqualTo(Optional.of(timestamp)); @@ -134,9 +124,7 @@ void givenOneItem_thenTimestampMapped() { @Test void givenOneItemWitNoTimestampButOffset_thenTimestampMappedFromOffset() { - givenRecords(Stream.of(record)); - given(recordParser.getTimestamp(record)).willReturn(Optional.empty()); - given(recordParser.getOffsets(record)).willReturn(ImmutableMap.of("timestamp", timestampIso)); + givenRecords(Stream.of(record.withTimestamp(null).withOffset(ImmutableMap.of("timestamp", timestampIso)))); given(timestampParser.parse(timestampIso)).willReturn(timestamp); assertThat(parser.parse(response)).first().extracting(KvRecord::getOffset).extracting(Offset::getTimestamp).isEqualTo(Optional.of(timestamp)); @@ -145,8 +133,7 @@ void givenOneItemWitNoTimestampButOffset_thenTimestampMappedFromOffset() { @Test void givenOneItemWithNoTimestamp_thenDefault() { - givenRecords(Stream.of(record)); - given(recordParser.getTimestamp(record)).willReturn(Optional.empty()); + givenRecords(Stream.of(record.withTimestamp(null))); assertThat(parser.parse(response)).first().extracting(KvRecord::getOffset).extracting(Offset::getTimestamp).isNotNull(); } @@ -154,8 +141,7 @@ void givenOneItemWithNoTimestamp_thenDefault() { @Test void givenOneItem_thenOffsetMapped() { - givenRecords(Stream.of(record)); - given(recordParser.getOffsets(record)).willReturn(ImmutableMap.of("offset-key", "offset-value")); + givenRecords(Stream.of(record.withOffset(ImmutableMap.of("offset-key", "offset-value")))); assertThat(parser.parse(response).stream().findFirst().get().getOffset().toMap().get("offset-key")).isEqualTo("offset-value"); } @@ -163,9 +149,7 @@ void givenOneItem_thenOffsetMapped() { @Test void givenOneItem_thenTimestampMappedToOffset() { - givenRecords(Stream.of(record)); - given(recordParser.getOffsets(record)).willReturn(emptyMap()); - given(recordParser.getTimestamp(record)).willReturn(Optional.of(timestampIso)); + givenRecords(Stream.of(record.withTimestamp(timestampIso).withOffset(emptyMap()))); given(timestampParser.parse(timestampIso)).willReturn(timestamp); assertThat(parser.parse(response).stream().findFirst().get().getOffset().getTimestamp()).contains(parse(timestampIso)); @@ -174,9 +158,7 @@ void givenOneItem_thenTimestampMappedToOffset() { @Test void givenOneItem_thenKeyMappedToOffset() { - givenRecords(Stream.of(record)); - given(recordParser.getOffsets(record)).willReturn(emptyMap()); - given(recordParser.getKey(record)).willReturn(Optional.of("value")); + givenRecords(Stream.of(record.withKey("value").withOffset(emptyMap()))); assertThat(parser.parse(response).stream().findFirst().get().getOffset().getKey()).contains("value"); } @@ -184,15 +166,13 @@ void givenOneItem_thenKeyMappedToOffset() { @Test void givenOneItemWithNoKey_thenConsistentUUIDMappedToOffset() { - givenRecords(Stream.of(record)); - given(recordParser.getOffsets(record)).willReturn(emptyMap()); - given(recordParser.getKey(record)).willReturn(Optional.empty()); + givenRecords(Stream.of(record.withKey(null).withOffset(emptyMap()))); - assertThat(parser.parse(response).stream().findFirst().get().getOffset().getKey()).contains(nameUUIDFromBytes(record.toString().getBytes()).toString()); + assertThat(parser.parse(response).stream().findFirst().get().getOffset().getKey()).contains(nameUUIDFromBytes(record.getBody().toString().getBytes()).toString()); } - private void givenRecords(Stream records) { - given(recordParser.getRecords(eq(bytes))).willReturn(records); + private void givenRecords(Stream records) { + given(responseParser.getRecords(bytes)).willReturn(records); } interface Fixture { @@ -200,5 +180,6 @@ interface Fixture { HttpResponse response = HttpResponse.builder().body(bytes).build(); Instant timestamp = ofEpochMilli(43L); String timestampIso = timestamp.toString(); + JacksonRecord record = JacksonRecord.builder().body("Binary Value").build(); } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserTest.java index 45819a69..54c8cf71 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserTest.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,20 +32,20 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Optional; -import java.util.stream.Stream; import static com.fasterxml.jackson.core.JsonPointer.compile; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.deserialize; import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.item1; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.item2; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.itemArray; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.mapper; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.pointer; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.property; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.jsonK1K2; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.pointerToK1; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.pointerToK2; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.v1; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.v2; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; +import static java.util.Optional.empty; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.given; @@ -57,22 +57,9 @@ class JacksonRecordParserTest { @Mock JacksonRecordParserConfig config; - @Mock - JacksonPropertyResolver propertyResolver; - @BeforeEach void setUp() { - parser = new JacksonRecordParser(__ -> config, mapper, propertyResolver); - } - - @Test - void givenPointer_whenGetItemsArray_thenAllReturned() { - - given(config.getRecordsPointer()).willReturn(pointer); - given(propertyResolver.getArrayAt(deserialize(itemArray), pointer)).willReturn(Stream.of(deserialize(item1), deserialize(item2))); - parser.configure(emptyMap()); - - assertThat(parser.getRecords(itemArray.getBytes())).containsExactly(deserialize(item1), deserialize(item2)); + parser = new JacksonRecordParser(__ -> config, new JacksonSerializer()); } @Test @@ -81,87 +68,82 @@ void givenNoPointer_whenGetKey_thenEmpty() { given(config.getKeyPointer()).willReturn(emptyList()); parser.configure(emptyMap()); - assertThat(parser.getKey(deserialize(item1))).isEmpty(); + assertThat(parser.getKey(jsonK1K2)).isEmpty(); } @Test void givenPointer_whenGetKey_thenKeyAsText() { - given(config.getKeyPointer()).willReturn(singletonList(pointer)); - given(propertyResolver.getObjectAt(deserialize(item1), pointer)).willReturn(deserialize(item1).at("/k1")); + given(config.getKeyPointer()).willReturn(singletonList(pointerToK1)); parser.configure(emptyMap()); - assertThat(parser.getKey(deserialize(item1))).contains(property); + assertThat(parser.getKey(jsonK1K2)).contains(v1); } @Test void givenPointers_whenGetKey_thenKeyAsTextConcatenated() { - JsonNode node = deserialize(item1); - given(config.getKeyPointer()).willReturn(asList(pointer, pointer)); - given(propertyResolver.getObjectAt(node, pointer)).willReturn(node.at("/k1")); + given(config.getKeyPointer()).willReturn(asList(pointerToK1, pointerToK2)); parser.configure(emptyMap()); - assertThat(parser.getKey(node)).contains(property + "+" + property); + assertThat(parser.getKey(jsonK1K2)).contains(v1 + "+" + v2); } @Test void givenPointer_whenGetValueText_thenValue() { - given(config.getValuePointer()).willReturn(pointer); - given(propertyResolver.getObjectAt(deserialize(item1), pointer)).willReturn(deserialize(item1).at("/k1")); + given(config.getValuePointer()).willReturn(pointerToK1); parser.configure(emptyMap()); - assertThat(parser.getValue(deserialize(item1))).isEqualTo(property); + assertThat(parser.getValue(jsonK1K2)).isEqualTo(v1); } @Test void givenPointer_whenGetValueObject_thenValue() { - given(config.getValuePointer()).willReturn(pointer); - given(propertyResolver.getObjectAt(deserialize(item1), pointer)).willReturn(deserialize(item1)); + given(config.getValuePointer()).willReturn(pointerToK1); parser.configure(emptyMap()); - assertThat(parser.getValue(deserialize(item1))).isEqualTo(item1); + assertThat(parser.getValue(jsonK1K2)).isEqualTo(v1); } @Test void givenNoPointer_whenGetTimestamp_thenEmpty() { - given(config.getTimestampPointer()).willReturn(Optional.empty()); + given(config.getTimestampPointer()).willReturn(empty()); parser.configure(emptyMap()); - assertThat(parser.getTimestamp(deserialize(item1))).isEmpty(); + assertThat(parser.getTimestamp(jsonK1K2)).isEmpty(); } @Test - void givenPointer_whenGetTimestamp_thenKeyAsText() { + void givenPointer_whenGetTimestamp_thenTimestampAsText() { - given(config.getTimestampPointer()).willReturn(Optional.of(pointer)); - given(propertyResolver.getObjectAt(deserialize(item1), pointer)).willReturn(deserialize(item1)); + given(config.getTimestampPointer()).willReturn(Optional.of(pointerToK1)); parser.configure(emptyMap()); - assertThat(parser.getTimestamp(deserialize(item1))).contains(deserialize(item1).asText()); + assertThat(parser.getTimestamp(jsonK1K2)).contains(v1); } @Test void givenPointer_whenGetOffset_thenOffset() { - given(config.getOffsetPointers()).willReturn(ImmutableMap.of("key", pointer)); - given(propertyResolver.getObjectAt(deserialize(item1), pointer)).willReturn(deserialize(item1)); + given(config.getOffsetPointers()).willReturn(ImmutableMap.of("key", pointerToK1)); parser.configure(emptyMap()); - assertThat(parser.getOffsets(deserialize(item1))).isEqualTo(ImmutableMap.of("key", deserialize(item1).asText())); + assertThat(parser.getOffset(jsonK1K2)).isEqualTo(ImmutableMap.of("key", v1)); } interface Fixture { ObjectMapper mapper = new ObjectMapper(); - String property = "v1"; - String item1 = "{\"k1\":\"" + property + "\"}"; - String item2 = "{\"k2\":\"v2\"}"; - String itemArray = "{\"items\":[" + item1 + "," + item2 + "]}"; - String itemNested = "{\"items\":" + item1 + "}"; - JsonPointer pointer = compile("/items"); + String k1 = "k1"; + String v1 = "v1"; + String k2 = "k2"; + String v2 = "v2"; + String item1 = "{\"" + k1 + "\":\"" + v1 + "\",\"" + k2 + "\":\"" + v2 + "\"}"; + JsonNode jsonK1K2 = deserialize(item1); + JsonPointer pointerToK1 = compile("/" + k1); + JsonPointer pointerToK2 = compile("/" + k2); @SneakyThrows static JsonNode deserialize(String body) { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParserTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParserTest.java new file mode 100644 index 00000000..e3c3ff0e --- /dev/null +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParserTest.java @@ -0,0 +1,153 @@ +package com.github.castorm.kafka.connect.http.response.jackson; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 - 2021 Cástor Rodríguez + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Optional; +import java.util.stream.Stream; + +import static com.fasterxml.jackson.core.JsonPointer.compile; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.deserialize; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonRecordParserTest.Fixture.item1; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonResponseRecordParserTest.Fixture.item2; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonResponseRecordParserTest.Fixture.itemArray; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonResponseRecordParserTest.Fixture.itemArrayJson; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonResponseRecordParserTest.Fixture.pointer; +import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +class JacksonResponseRecordParserTest { + + JacksonResponseRecordParser parser; + + @Mock + JacksonRecordParserConfig config; + + @Mock + JacksonRecordParser recordParser; + + @Mock + JacksonSerializer serializer; + + @BeforeEach + void setUp() { + parser = new JacksonResponseRecordParser(__ -> config, recordParser, serializer); + + given(recordParser.getKey(deserialize(item1))).willReturn(Optional.of("value")); + } + + @Test + void givenRecords_whenGetRecords_thenAllReturned() { + + givenRecords(deserialize(item1), deserialize(item2)); + + assertThat(parser.getRecords(itemArray.getBytes()).collect(toList())).hasSize(2); + } + + @Test + void givenRecordWithKey_whenGetRecords_thenItemKeyMapped() { + + givenRecords(deserialize(item1)); + given(recordParser.getKey(deserialize(item1))).willReturn(Optional.of("value")); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getKey()).isEqualTo("value"); + } + + @Test + void givenRecordWithoutKey_whenGetRecords_thenItemKeyNull() { + + givenRecords(deserialize(item1)); + given(recordParser.getKey(deserialize(item1))).willReturn(Optional.empty()); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getKey()).isNull(); + } + + @Test + void givenRecordWithTimestamp_whenGetRecords_thenItemTimestampMapped() { + + givenRecords(deserialize(item1)); + given(recordParser.getTimestamp(deserialize(item1))).willReturn(Optional.of("value")); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getTimestamp()).isEqualTo("value"); + } + + @Test + void givenRecordWithoutTimestamp_whenGetRecords_thenItemTimestampNull() { + + givenRecords(deserialize(item1)); + given(recordParser.getTimestamp(deserialize(item1))).willReturn(Optional.empty()); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getTimestamp()).isNull(); + } + + @Test + void givenRecordWithOffset_whenGetRecords_thenItemOffsetMapped() { + + givenRecords(deserialize(item1)); + given(recordParser.getOffset(deserialize(item1))).willReturn(ImmutableMap.of("k", "v")); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getOffset()).isEqualTo(ImmutableMap.of("k", "v")); + } + + @Test + void givenRecordWithValue_whenGetRecords_thenItemBodyMapped() { + + givenRecords(deserialize(item1)); + given(recordParser.getValue(deserialize(item1))).willReturn("value"); + + assertThat(parser.getRecords(itemArray.getBytes()).findFirst().get().getBody()).isEqualTo("value"); + } + + private void givenRecords(JsonNode... records) { + given(config.getRecordsPointer()).willReturn(pointer); + given(serializer.deserialize(itemArray.getBytes())).willReturn(itemArrayJson); + given(serializer.getArrayAt(itemArrayJson, pointer)).willReturn(Stream.of(records)); + parser.configure(emptyMap()); + } + + interface Fixture { + ObjectMapper mapper = new ObjectMapper(); + String property = "v1"; + String item1 = "{\"k1\":\"" + property + "\"}"; + String item2 = "{\"k2\":\"v2\"}"; + String itemArray = "{\"items\":[" + item1 + "," + item2 + "]}"; + JsonNode itemArrayJson = deserialize(itemArray); + JsonPointer pointer = compile("/items"); + + @SneakyThrows + static JsonNode deserialize(String body) { + return mapper.readTree(body); + } + } +} diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolverTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializerTest.java similarity index 55% rename from kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolverTest.java rename to kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializerTest.java index d3a348cf..e6c9c033 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonPropertyResolverTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializerTest.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,54 +20,82 @@ * #L% */ +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; + import static com.fasterxml.jackson.core.JsonPointer.compile; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonPropertyResolverTest.Fixture.array; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonPropertyResolverTest.Fixture.deserialize; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonPropertyResolverTest.Fixture.item1; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonPropertyResolverTest.Fixture.item2; -import static com.github.castorm.kafka.connect.http.response.jackson.JacksonPropertyResolverTest.Fixture.itemArray; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.array; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.deserialize; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.item1; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.item1Json; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.item2; +import static com.github.castorm.kafka.connect.http.response.jackson.JacksonSerializerTest.Fixture.itemArray; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; @ExtendWith(MockitoExtension.class) -class JacksonPropertyResolverTest { +class JacksonSerializerTest { + + @InjectMocks + JacksonSerializer serializer; + + @Mock + ObjectMapper mapper; + + @Test + void whenSerialize_thenSerializedByMapper() throws JsonProcessingException { + + given(mapper.writeValueAsString(item1Json)).willReturn(item1); - JacksonPropertyResolver resolver = new JacksonPropertyResolver(); + assertThat(serializer.serialize(item1Json)).isEqualTo(item1); + } + + @Test + void whenDeserialize_thenDeserializedByMapper() throws IOException { + + given(mapper.readTree(item1.getBytes())).willReturn(item1Json); + + assertThat(serializer.deserialize(item1.getBytes())).isEqualTo(item1Json); + } @Test void whenGetArrayAtPointerObject_thenObject() { - assertThat(resolver.getArrayAt(deserialize(item1), compile("/"))).containsExactly(deserialize(item1)); + assertThat(serializer.getArrayAt(deserialize(item1), compile("/"))).containsExactly(deserialize(item1)); } @Test void whenGetArrayAtPointerArray_thenAllItems() { - assertThat(resolver.getArrayAt(deserialize(array), compile("/"))).containsExactly(deserialize(item1), deserialize(item2)); + assertThat(serializer.getArrayAt(deserialize(array), compile("/"))).containsExactly(deserialize(item1), deserialize(item2)); } @Test void whenGetArrayAtPointerItems_thenAllItems() { - assertThat(resolver.getArrayAt(deserialize(itemArray), compile("/items"))).containsExactly(deserialize(item1), deserialize(item2)); + assertThat(serializer.getArrayAt(deserialize(itemArray), compile("/items"))).containsExactly(deserialize(item1), deserialize(item2)); } @Test void whenGetObjectAtRoot_thenRoot() { - assertThat(resolver.getObjectAt(deserialize(item1), compile("/"))).isEqualTo(deserialize(item1)); + assertThat(serializer.getObjectAt(deserialize(item1), compile("/"))).isEqualTo(deserialize(item1)); } @Test void whenGetObjectAtProperty_thenProperty() { - assertThat(resolver.getObjectAt(deserialize(item1), compile("/k1"))).isEqualTo(deserialize(item1).at("/k1")); + assertThat(serializer.getObjectAt(deserialize(item1), compile("/k1"))).isEqualTo(deserialize(item1).at("/k1")); } interface Fixture { ObjectMapper mapper = new ObjectMapper(); String item1 = "{\"k1\":\"v1\"}"; + JsonNode item1Json = deserialize(item1); String item2 = "{\"k2\":\"v2\"}"; String array = "[" + item1 + "," + item2 + "]"; String itemArray = "{\"items\":[" + item1 + "," + item2 + "]}";