diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java index 988318f3..2be8279c 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java @@ -42,8 +42,9 @@ import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; -import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; +import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; @Getter class HttpSourceConnectorConfig extends AbstractConfig { @@ -55,6 +56,11 @@ class HttpSourceConnectorConfig extends AbstractConfig { private static final String RECORD_SORTER = "http.record.sorter"; private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory"; private static final String OFFSET_INITIAL = "http.offset.initial"; + private static final String HANDLE_PAGINATION = "http.request.pagination.handle"; + private static final String NEXT_PAGE_URL_MODE = "http.request.pagination.mode"; + private static final String NEXT_PAGE_BASE_URL = "http.request.pagination.baseurl"; + private static final String OVERWRITE = "overwrite"; + private static final String APPEND = "append"; private final TimerThrottler throttler; private final HttpRequestFactory requestFactory; @@ -63,6 +69,9 @@ class HttpSourceConnectorConfig extends AbstractConfig { private final SourceRecordFilterFactory recordFilterFactory; private final SourceRecordSorter recordSorter; private final Map initialOffset; + private final Boolean handlePagination; + private final Boolean appendNextPageUrl; + private final String baseUrl; HttpSourceConnectorConfig(Map originals) { super(config(), originals); @@ -74,6 +83,9 @@ class HttpSourceConnectorConfig extends AbstractConfig { recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class); recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class); initialOffset = breakDownMap(getString(OFFSET_INITIAL)); + handlePagination = getBoolean(HANDLE_PAGINATION); + appendNextPageUrl = getString(NEXT_PAGE_URL_MODE).equalsIgnoreCase(APPEND); + baseUrl = getString(NEXT_PAGE_BASE_URL); } public static ConfigDef config() { @@ -84,6 +96,9 @@ public static ConfigDef config() { .define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class") .define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class") .define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class") - .define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset"); + .define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset") + .define(HANDLE_PAGINATION, BOOLEAN, false, LOW, "Handle Pagination") + .define(NEXT_PAGE_URL_MODE, STRING, OVERWRITE, ConfigDef.ValidString.in(APPEND, OVERWRITE), LOW, "Append or overwrite the next page URL") + .define(NEXT_PAGE_BASE_URL, STRING, "", LOW, "Base URL in case of append mode"); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java index 1a86c117..bb2cd2bf 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java @@ -40,8 +40,7 @@ import java.io.IOException; import java.time.Instant; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion; @@ -67,6 +66,16 @@ public class HttpSourceTask extends SourceTask { private SourceRecordFilterFactory recordFilterFactory; + private Boolean handlePagination; + + private Boolean appendNextPageUrl; + + private String baseUrl; + + private String modifiedUrl; + + private HttpRequest request = null; + private ConfirmationWindow> confirmationWindow = new ConfirmationWindow<>(emptyList()); @Getter @@ -92,6 +101,10 @@ public void start(Map settings) { recordSorter = config.getRecordSorter(); recordFilterFactory = config.getRecordFilterFactory(); offset = loadOffset(config.getInitialOffset()); + handlePagination = !Objects.isNull(config.getHandlePagination()) && config.getHandlePagination(); + appendNextPageUrl = !Objects.isNull(config.getAppendNextPageUrl()) && config.getAppendNextPageUrl(); + baseUrl = config.getBaseUrl(); + modifiedUrl = null; } private Offset loadOffset(Map initialOffset) { @@ -104,11 +117,37 @@ public List poll() throws InterruptedException { throttler.throttle(offset.getTimestamp().orElseGet(Instant::now)); - HttpRequest request = requestFactory.createRequest(offset); +// HttpRequest request = requestFactory.createRequest(offset); + + List records = new ArrayList<>(); + + if(handlePagination && !Objects.isNull(modifiedUrl)) { + request = HttpRequest.builder() + .method(request.getMethod()) + .url(modifiedUrl) + .headers(request.getHeaders()) + .body(request.getBody()) + .build(); + } else { + request = requestFactory.createRequest(offset); + } HttpResponse response = execute(request); - List records = responseParser.parse(response); + records.addAll(responseParser.parse(response)); + + if(handlePagination) { + Optional nextPageUrl = responseParser.getNextPageUrl(response); + log.info("Next page URL: {}", nextPageUrl.orElse("no value")); + if( isNextPageUrlPresent(nextPageUrl) ) { + modifiedUrl = appendNextPageUrl + ? baseUrl + nextPageUrl.orElse("") + : nextPageUrl.orElse(null); + } else { + modifiedUrl = null; + } + } + List unseenRecords = recordSorter.sort(records).stream() .filter(recordFilterFactory.create(offset)) @@ -129,6 +168,13 @@ private HttpResponse execute(HttpRequest request) { } } + private Boolean isNextPageUrlPresent(Optional nextPageUrl) { + return nextPageUrl.isPresent() && + !Objects.isNull(nextPageUrl.orElse(null)) && + !nextPageUrl.orElse(null).equalsIgnoreCase("null"); + } + + private static List> extractOffsets(List recordsToSend) { return recordsToSend.stream() .map(SourceRecord::sourceOffset) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java index d41a6acf..1e543ddc 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import static java.util.stream.Collectors.toList; @@ -59,4 +60,8 @@ public List parse(HttpResponse response) { .map(recordMapper::map) .collect(toList()); } + + public Optional getNextPageUrl(HttpResponse response) { + return recordParser.getNextPageUrl(response); + } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java index b6e16e0e..bfa45fcb 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -65,4 +66,17 @@ public List parse(HttpResponse response) { throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse(""))); } } + + @Override + public Optional getNextPageUrl(HttpResponse response) { + switch (policy.resolve(response)) { + case PROCESS: + return delegate.getNextPageUrl(response); + case SKIP: + return Optional.empty(); + case FAIL: + default: + throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse(""))); + } + } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java index ddb00e76..cfb1c6cf 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java @@ -65,6 +65,11 @@ public List parse(HttpResponse response) { .collect(toList()); } + @Override + public Optional getNextPageUrl(HttpResponse response) { + return responseParser.getNextPageUrl(response.getBody()); + } + private KvRecord map(JacksonRecord record) { Map offsets = record.getOffset(); diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java index 46646a40..140828c7 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java @@ -61,7 +61,7 @@ public void configure(Map settings) { keyPointer = config.getKeyPointer(); valuePointer = config.getValuePointer(); offsetPointers = config.getOffsetPointers(); - timestampPointer = config.getTimestampPointer(); + } /** diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java index 1bb8780a..9fbe17c6 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java @@ -39,6 +39,7 @@ import static java.util.stream.Collectors.toMap; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; @Getter @@ -49,12 +50,14 @@ public class JacksonRecordParserConfig extends AbstractConfig { private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer"; private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer"; private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer"; + private static final String NEXT_PAGE_POINTER = "http.response.next.page.pointer"; private final JsonPointer recordsPointer; private final List keyPointer; private final JsonPointer valuePointer; private final Optional timestampPointer; private final Map offsetPointers; + private final Optional nextPagePointer; JacksonRecordParserConfig(Map originals) { super(config(), originals); @@ -65,6 +68,7 @@ public class JacksonRecordParserConfig extends AbstractConfig { offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream() .map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue()))) .collect(toMap(Entry::getKey, Entry::getValue)); + nextPagePointer = ofNullable(getString(NEXT_PAGE_POINTER)).map(JsonPointer::compile); } public static ConfigDef config() { @@ -73,6 +77,7 @@ public static ConfigDef config() { .define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer") .define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers") .define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer") - .define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers"); + .define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers") + .define(NEXT_PAGE_POINTER, STRING, "/next", LOW, "Pointer for next page"); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java index 6caff747..49bca82c 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.Configurable; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Stream; @@ -45,6 +46,10 @@ public class JacksonResponseRecordParser implements Configurable { private JsonPointer recordsPointer; + private Optional nextPagePointer; + + private JsonNode jsonBody; + public JacksonResponseRecordParser() { this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper())); } @@ -61,7 +66,7 @@ public void configure(Map settings) { Stream getRecords(byte[] body) { - JsonNode jsonBody = serializer.deserialize(body); + this.jsonBody = serializer.deserialize(body); Map responseOffset = getResponseOffset(jsonBody); @@ -69,6 +74,13 @@ Stream getRecords(byte[] body) { .map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset)); } + Optional getNextPageUrl(byte[] body) { + return nextPagePointer.map(pointer -> + serializer.checkIfNonNull(this.jsonBody, pointer) + ? serializer.getObjectAt(this.jsonBody, pointer).asText() + : null); + } + private Map getResponseOffset(JsonNode node) { return emptyMap(); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java index f99bb3ba..a6818fd4 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java @@ -62,6 +62,11 @@ Stream getArrayAt(JsonNode node, JsonPointer pointer) { return array.isArray() ? stream(array.spliterator(), false) : Stream.of(array); } + boolean checkIfNonNull(JsonNode node, JsonPointer pointer) { + return !node.at(pointer).isMissingNode(); + } + + private static JsonNode getRequiredAt(JsonNode body, JsonPointer recordsPointer) { return JSON_ROOT.equals(recordsPointer) ? body : body.requiredAt(recordsPointer); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java index df15b37d..584b8ea0 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; +import java.util.Optional; -@FunctionalInterface public interface HttpResponseParser extends Configurable { List parse(HttpResponse response); @@ -35,4 +35,6 @@ public interface HttpResponseParser extends Configurable { default void configure(Map map) { // Do nothing } + + Optional getNextPageUrl(HttpResponse response); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/KvRecordHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/KvRecordHttpResponseParser.java index 80eb3b05..5f267321 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/KvRecordHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/KvRecordHttpResponseParser.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; +import java.util.Optional; -@FunctionalInterface public interface KvRecordHttpResponseParser extends Configurable { List parse(HttpResponse response); @@ -35,4 +35,8 @@ public interface KvRecordHttpResponseParser extends Configurable { default void configure(Map map) { // Do nothing } + + Optional getNextPageUrl(HttpResponse response); + + } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java index 44634a92..311e24d7 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.config; import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.configWithout; @@ -136,6 +137,11 @@ public static class TestResponseParser implements HttpResponseParser { public List parse(HttpResponse response) { return null; } + + @Override + public Optional getNextPageUrl(HttpResponse response) { + return Optional.empty(); + } } public static class TestRecordSorter implements SourceRecordSorter { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java index e21a1352..f8d550ab 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; @@ -65,6 +66,11 @@ public static class TestResponseParser implements KvRecordHttpResponseParser { public List parse(HttpResponse response) { return null; } + + @Override + public Optional getNextPageUrl(HttpResponse response) { + return Optional.empty(); + } } public static class TestRecordMapper implements KvSourceRecordMapper { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java index 750a79d2..2fa86ffb 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; @@ -62,6 +63,11 @@ public static class TestResponseParser implements HttpResponseParser { public List parse(HttpResponse response) { return null; } + + @Override + public Optional getNextPageUrl(HttpResponse response) { + return Optional.empty(); + } } public static class TestPolicy implements HttpResponsePolicy {