From 539969bbc90486bc037dc8ecb7ad6996de423dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A1stor=20Rodr=C3=ADguez?= Date: Mon, 25 May 2020 09:19:24 +0200 Subject: [PATCH] Provide `BytesKvSourceRecordMapper` to avoid both the envelope wrapping key and value and the escaped encoding of the json document --- CHANGELOG.md | 3 + README.md | 11 +- .../record/BytesKvSourceRecordMapper.java | 76 +++++++++++++ .../BytesKvSourceRecordMapperConfig.java | 54 +++++++++ .../BytesKvSourceRecordMapperConfigTest.java | 67 +++++++++++ .../record/BytesKvSourceRecordMapperTest.java | 106 ++++++++++++++++++ .../StringKvSourceRecordMapperTest.java | 6 +- 7 files changed, 318 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapper.java create mode 100644 src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfig.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfigTest.java create mode 100644 src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e412573c..7f526158 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## v0.7 +### v0.7.3 (05/25/2020) +- Provide `BytesKvSourceRecordMapper` to avoid both the envelope wrapping key and value and the escaped encoding of the json document + ### v0.7.2 (05/25/2020) - Provide `StringKvSourceRecordMapper` to avoid the envelope wrapping key and value diff --git a/README.md b/README.md index 39a21d5c..6a4ee70c 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ plugins folder. com.github.castorm kafka-connect-http - 0.7.2 + 0.7.3 tar.gz plugin @@ -300,7 +300,14 @@ Parses the HTTP response into a key-value SourceRecord. This process is decompos > * `com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper` Maps **key** to a *Struct schema* > with a single property `key`, and **value** to a *Struct schema* with a single property `value` > * `com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper` Maps both **key** and **value** to -> a string schema with a single value +> a `String` schema +> * `com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapper` Maps both **key** and **value** to +> a `byte[]` schema in a configurable charset +> +> ##### `http.response.record.mapper.charset` +> Charset use when `BytesKvSourceRecordMapper`. +> * Type: String +> * Default: `UTF-8` ##### Parsing a HttpResponse with JacksonKvRecordHttpResponseParser Uses [Jackson](https://github.com/FasterXML/jackson) to look for the records in the response. diff --git a/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapper.java b/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapper.java new file mode 100644 index 00000000..fc370004 --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapper.java @@ -0,0 +1,76 @@ +package com.github.castorm.kafka.connect.http.record; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.record.model.KvRecord; +import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; + +import java.nio.charset.Charset; +import java.util.Map; +import java.util.function.Function; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.connect.data.SchemaBuilder.bytes; + +@RequiredArgsConstructor +public class BytesKvSourceRecordMapper implements KvSourceRecordMapper { + + private static Map sourcePartition = emptyMap(); + + private static final Schema keySchema = bytes().build(); + + private static final Schema valueSchema = bytes().build(); + + private final Function, BytesKvSourceRecordMapperConfig> configFactory; + + private BytesKvSourceRecordMapperConfig config; + + private Charset charset; + + public BytesKvSourceRecordMapper() { + this(BytesKvSourceRecordMapperConfig::new); + } + + @Override + public void configure(Map settings) { + config = configFactory.apply(settings); + } + + @Override + public SourceRecord map(KvRecord record) { + + Offset offset = record.getOffset(); + return new SourceRecord( + sourcePartition, + offset.toMap(), + config.getTopic(), + null, + keySchema, + record.getKey().getBytes(config.getCharset()), + valueSchema, + record.getValue().getBytes(config.getCharset()), + offset.getTimestamp().toEpochMilli()); + } +} diff --git a/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfig.java b/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfig.java new file mode 100644 index 00000000..6c76a68e --- /dev/null +++ b/src/main/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfig.java @@ -0,0 +1,54 @@ +package com.github.castorm.kafka.connect.http.record; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import lombok.Getter; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.nio.charset.Charset; +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +@Getter +public class BytesKvSourceRecordMapperConfig extends AbstractConfig { + + private static final String TOPIC = "kafka.topic"; + private static final String CHARSET = "http.response.record.mapper.charset"; + + private final String topic; + private final Charset charset; + + BytesKvSourceRecordMapperConfig(Map originals) { + super(config(), originals); + topic = getString(TOPIC); + charset = Charset.forName(getString(CHARSET)); + } + + public static ConfigDef config() { + return new ConfigDef() + .define(TOPIC, STRING, HIGH, "Kafka Topic") + .define(CHARSET, STRING, "UTF-8", LOW, "Charset"); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfigTest.java b/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfigTest.java new file mode 100644 index 00000000..09ba7ac2 --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperConfigTest.java @@ -0,0 +1,67 @@ +package com.github.castorm.kafka.connect.http.record; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import static com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapperConfigTest.Fixture.minimumConfig; +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; + +class BytesKvSourceRecordMapperConfigTest { + + @Test + void whenMissingKafkaTopic_thenException() { + assertThat(catchThrowable(() -> new BytesKvSourceRecordMapperConfig(emptyMap()))).isInstanceOf(ConfigException.class); + } + + @Test + void whenKafkaTopic_thenInitialized() { + assertThat(minimumConfig(emptyMap()).getTopic()).isEqualTo("test-topic"); + } + + @Test + void whenMissingCharsetProperty_thenDefault() { + assertThat(minimumConfig(emptyMap()).getCharset()).isEqualTo(Charset.forName("UTF-8")); + } + + @Test + void whenCharsetProperty_thenInitialized() { + assertThat(minimumConfig(ImmutableMap.of("http.response.record.mapper.charset", "US-ASCII")).getCharset()).isEqualTo(Charset.forName("US-ASCII")); + } + + interface Fixture { + + static BytesKvSourceRecordMapperConfig minimumConfig(Map customConfig) { + HashMap finalConfig = new HashMap<>(); + finalConfig.put("kafka.topic", "test-topic"); + finalConfig.putAll(customConfig); + return new BytesKvSourceRecordMapperConfig(finalConfig); + } + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperTest.java b/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperTest.java new file mode 100644 index 00000000..0cbec8fb --- /dev/null +++ b/src/test/java/com/github/castorm/kafka/connect/http/record/BytesKvSourceRecordMapperTest.java @@ -0,0 +1,106 @@ +package com.github.castorm.kafka.connect.http.record; + +/*- + * #%L + * kafka-connect-http + * %% + * Copyright (C) 2020 CastorM + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.record.model.KvRecord; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.charset.Charset; +import java.time.Instant; + +import static com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapperTest.Fixture.charset; +import static com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapperTest.Fixture.now; +import static com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapperTest.Fixture.offset; +import static com.github.castorm.kafka.connect.http.record.BytesKvSourceRecordMapperTest.Fixture.record; +import static java.time.Instant.now; +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + +@ExtendWith(MockitoExtension.class) +class BytesKvSourceRecordMapperTest { + + BytesKvSourceRecordMapper mapper; + + @Mock + BytesKvSourceRecordMapperConfig config; + + @BeforeEach + void setUp() { + given(config.getTopic()).willReturn("topic"); + given(config.getCharset()).willReturn(charset); + mapper = new BytesKvSourceRecordMapper(__ -> config); + mapper.configure(emptyMap()); + } + + @Test + void givenTopic_whenMap_thenTopicMapped() { + assertThat(mapper.map(record).topic()).isEqualTo("topic"); + } + + @Test + void givenKey_whenMap_thenIdMapped() { + assertThat(mapper.map(record.withKey("value")).key()).isEqualTo("value".getBytes(charset)); + } + + @Test + void givenValue_whenMap_thenBodyMapped() { + assertThat(mapper.map(record.withValue("value")).value()).isEqualTo("value".getBytes(charset)); + } + + @Test + void givenOffset_whenMap_thenOffsetMapped() { + assertThat(mapper.map(record.withOffset(offset)).sourceOffset()).isEqualTo(offset.toMap()); + } + + @Test + void givenTimestamp_whenMap_thenTimestampMapped() { + assertThat(mapper.map(record.withOffset(offset)).timestamp()).isEqualTo(now.toEpochMilli()); + } + + @Test + void whenMap_thenNoPartitionMapped() { + assertThat(mapper.map(record).kafkaPartition()).isNull(); + } + + @Test + void whenMap_thenKeySchemaMapped() { + assertThat(mapper.map(record).keySchema()).isNotNull(); + } + + @Test + void whenMap_thenValueSchemaMapped() { + assertThat(mapper.map(record).valueSchema()).isNotNull(); + } + + interface Fixture { + Instant now = now(); + Offset offset = Offset.of(ImmutableMap.of("k", "v"), "key", now); + KvRecord record = KvRecord.builder().key("not-null").value("not-null").offset(offset).build(); + Charset charset = Charset.forName("UTF-8"); + } +} diff --git a/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java b/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java index 4f3da851..a9e5c399 100644 --- a/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java +++ b/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java @@ -31,9 +31,9 @@ import java.time.Instant; -import static com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapperTest.Fixture.now; -import static com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapperTest.Fixture.offset; -import static com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapperTest.Fixture.record; +import static com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapperTest.Fixture.now; +import static com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapperTest.Fixture.offset; +import static com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapperTest.Fixture.record; import static java.time.Instant.now; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat;