diff --git a/README.md b/README.md index 6aade02b..8863da75 100644 --- a/README.md +++ b/README.md @@ -539,7 +539,7 @@ is especially important, as subsequent request are likely to produce similar res offered by this connector are order-dependent, as they are usually based on timestamps. To enable de-duplication in cases like this, we can instruct the connector to assume a specific order direction, either -`ASC`, `DESC`, or `IMPLICIT`, where implicit figures it out based on records' timestamps. +`ASC`, `DESC`, `IMPLICIT`, where implicit figures it out based on records' timestamps, or `ASC_FORCED_BY_TIMESTAMP` where items are unordered and you want to order it by timestamp. > #### `http.record.sorter` > ```java @@ -555,7 +555,7 @@ To enable de-duplication in cases like this, we can instruct the connector to as > > #### `http.response.list.order.direction` > Order direction of the results in the response list. -> * Type: `Enum { ASC, DESC, IMPLICIT }` +> * Type: `Enum { ASC, DESC, IMPLICIT, ASC_FORCED_BY_TIMESTAMP }` > * Default: `IMPLICIT` --- diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/ObjectMapKvSourceRecordMapper.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/ObjectMapKvSourceRecordMapper.java new file mode 100644 index 00000000..48f44fe0 --- /dev/null +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/ObjectMapKvSourceRecordMapper.java @@ -0,0 +1,91 @@ +package com.github.castorm.kafka.connect.http.record; + +/*- + * #%L + * Kafka Connect HTTP + * %% + * Copyright (C) 2020 - 2024 Cástor Rodríguez + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.record.model.KvRecord; +import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.connect.source.SourceRecord; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +import static java.util.Collections.emptyMap; + +@RequiredArgsConstructor +public class ObjectMapKvSourceRecordMapper implements KvSourceRecordMapper { + + private static final String KEY_FIELD_NAME = "key"; + private static final String TIMESTAMP_FIELD_NAME = "timestamp"; + + private static Map sourcePartition = emptyMap(); + + private final Function, SourceRecordMapperConfig> configFactory; + + private SourceRecordMapperConfig config; + + // Jackson ObjectMapper for deserialization + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public ObjectMapKvSourceRecordMapper() { + this(SourceRecordMapperConfig::new); + } + + @Override + public void configure(Map settings) { + config = configFactory.apply(settings); + } + + @Override + public SourceRecord map(KvRecord record) { + + Offset offset = record.getOffset(); + Long timestamp = offset.getTimestamp().map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis); + + String key = record.getKey(); + + Map deserializedValue; + try { + deserializedValue = objectMapper.readValue(record.getValue().toString(), new TypeReference>() {}); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize record value", e); + } + + deserializedValue.put(KEY_FIELD_NAME, key); + deserializedValue.put(TIMESTAMP_FIELD_NAME, timestamp); + + return new SourceRecord( + sourcePartition, + offset.toMap(), + config.getTopic(), + null, + null, + key, + null, + deserializedValue, + timestamp); + } +} diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorter.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorter.java index 813bb047..d9cc67ac 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorter.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorter.java @@ -22,12 +22,15 @@ import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter; import lombok.RequiredArgsConstructor; +import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.source.SourceRecord; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.ASC; import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.DESC; @@ -62,6 +65,8 @@ private static List sortWithDirection(List records, return reversed; case ASC: return records; + case ASC_FORCED_BY_TIMESTAMP: + return records.stream().sorted(Comparator.comparing(ConnectRecord::timestamp)).collect(Collectors.toList()); case IMPLICIT: default: return sortWithDirection(records, getImplicitDirection(records)); @@ -78,6 +83,6 @@ private static OrderDirection getImplicitDirection(List records) { } public enum OrderDirection { - ASC, DESC, IMPLICIT + ASC, DESC, IMPLICIT, ASC_FORCED_BY_TIMESTAMP } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterConfig.java index 783401a3..2d5e2c73 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterConfig.java @@ -44,6 +44,6 @@ public class OrderDirectionSourceRecordSorterConfig extends AbstractConfig { public static ConfigDef config() { return new ConfigDef() - .define(ORDER_DIRECTION, STRING, "IMPLICIT", LOW, "Order direction of the results in the list, either ASC, DESC or IMPLICIT"); + .define(ORDER_DIRECTION, STRING, "IMPLICIT", LOW, "Order direction of the results in the list, either ASC, DESC, IMPLICIT or ASC_FORCED_BY_TIMESTAMP"); } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterTest.java index fb36aef7..c92e25dc 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OrderDirectionSourceRecordSorterTest.java @@ -33,11 +33,8 @@ import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.ASC; import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.DESC; import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.IMPLICIT; -import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.mid; -import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.newer; -import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.older; -import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.ordered; -import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.reverseOrdered; +import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter.OrderDirection.ASC_FORCED_BY_TIMESTAMP; +import static com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorterTest.Fixture.*; import static java.lang.Long.MAX_VALUE; import static java.lang.Long.MIN_VALUE; import static java.util.Arrays.asList; @@ -100,6 +97,30 @@ void givenImplicit_whenReverseOrderedRecords_thenAsIs() { assertThat(sorter.sort(reverseOrdered)).containsExactly(older, mid, newer); } + @Test + void givenAscByTimestamp_whenOrderedRecords_thenAsIs() { + + givenDirection(ASC_FORCED_BY_TIMESTAMP); + + assertThat(sorter.sort(ordered)).containsExactly(older, mid, newer); + } + + @Test + void givenAscByTimestamp_whenReverseOrderedRecords_thenAsIs() { + + givenDirection(ASC_FORCED_BY_TIMESTAMP); + + assertThat(sorter.sort(reverseOrdered)).containsExactly(older, mid, newer); + } + + @Test + void givenAscByTimestamp_whenUnOrderedRecords_thenAsIs() { + + givenDirection(ASC_FORCED_BY_TIMESTAMP); + + assertThat(sorter.sort(unordered)).containsExactly(older, mid, newer); + } + private void givenDirection(OrderDirection asc) { sorter = new OrderDirectionSourceRecordSorter(__ -> config); given(config.getOrderDirection()).willReturn(asc); @@ -111,6 +132,8 @@ interface Fixture { SourceRecord mid = new SourceRecord(null, null, null, null, null, null, null, null, 0L); SourceRecord newer = new SourceRecord(null, null, null, null, null, null, null, null, MAX_VALUE); List ordered = asList(older, mid, newer); + + List unordered = asList(older, newer, mid); List reverseOrdered = asList(newer, mid, older); } }