Skip to content

Commit

Permalink
castorm#88 | Initial idea to support pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
AkshatJindal1 committed Jan 5, 2021
1 parent 5d02dd4 commit 28e173f
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;

@Getter
class HttpSourceConnectorConfig extends AbstractConfig {
Expand All @@ -55,6 +56,11 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private static final String RECORD_SORTER = "http.record.sorter";
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
private static final String OFFSET_INITIAL = "http.offset.initial";
private static final String HANDLE_PAGINATION = "http.request.pagination.handle";
private static final String NEXT_PAGE_URL_MODE = "http.request.pagination.mode";
private static final String NEXT_PAGE_BASE_URL = "http.request.pagination.baseurl";
private static final String OVERWRITE = "overwrite";
private static final String APPEND = "append";

private final TimerThrottler throttler;
private final HttpRequestFactory requestFactory;
Expand All @@ -63,6 +69,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private final SourceRecordFilterFactory recordFilterFactory;
private final SourceRecordSorter recordSorter;
private final Map<String, String> initialOffset;
private final Boolean handlePagination;
private final Boolean appendNextPageUrl;
private final String baseUrl;

HttpSourceConnectorConfig(Map<String, ?> originals) {
super(config(), originals);
Expand All @@ -74,6 +83,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
handlePagination = getBoolean(HANDLE_PAGINATION);
appendNextPageUrl = getString(NEXT_PAGE_URL_MODE).equalsIgnoreCase(APPEND);
baseUrl = getString(NEXT_PAGE_BASE_URL);
}

public static ConfigDef config() {
Expand All @@ -84,6 +96,9 @@ public static ConfigDef config() {
.define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class")
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset");
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
.define(HANDLE_PAGINATION, BOOLEAN, false, LOW, "Handle Pagination")
.define(NEXT_PAGE_URL_MODE, STRING, OVERWRITE, ConfigDef.ValidString.in(APPEND, OVERWRITE), LOW, "Append or overwrite the next page URL")
.define(NEXT_PAGE_BASE_URL, STRING, "", LOW, "Base URL in case of append mode");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;

import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
Expand All @@ -67,6 +66,16 @@ public class HttpSourceTask extends SourceTask {

private SourceRecordFilterFactory recordFilterFactory;

private Boolean handlePagination;

private Boolean appendNextPageUrl;

private String baseUrl;

private String modifiedUrl;

private HttpRequest request = null;

private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());

@Getter
Expand All @@ -92,6 +101,10 @@ public void start(Map<String, String> settings) {
recordSorter = config.getRecordSorter();
recordFilterFactory = config.getRecordFilterFactory();
offset = loadOffset(config.getInitialOffset());
handlePagination = !Objects.isNull(config.getHandlePagination()) && config.getHandlePagination();
appendNextPageUrl = !Objects.isNull(config.getAppendNextPageUrl()) && config.getAppendNextPageUrl();
baseUrl = config.getBaseUrl();
modifiedUrl = null;
}

private Offset loadOffset(Map<String, String> initialOffset) {
Expand All @@ -104,11 +117,37 @@ public List<SourceRecord> poll() throws InterruptedException {

throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));

HttpRequest request = requestFactory.createRequest(offset);
// HttpRequest request = requestFactory.createRequest(offset);

List<SourceRecord> records = new ArrayList<>();

if(handlePagination && !Objects.isNull(modifiedUrl)) {
request = HttpRequest.builder()
.method(request.getMethod())
.url(modifiedUrl)
.headers(request.getHeaders())
.body(request.getBody())
.build();
} else {
request = requestFactory.createRequest(offset);
}

HttpResponse response = execute(request);

List<SourceRecord> records = responseParser.parse(response);
records.addAll(responseParser.parse(response));

if(handlePagination) {
Optional<String> nextPageUrl = responseParser.getNextPageUrl(response);
log.info("Next page URL: {}", nextPageUrl.orElse("no value"));
if( isNextPageUrlPresent(nextPageUrl) ) {
modifiedUrl = appendNextPageUrl
? baseUrl + nextPageUrl.orElse("")
: nextPageUrl.orElse(null);
} else {
modifiedUrl = null;
}
}


List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
.filter(recordFilterFactory.create(offset))
Expand All @@ -129,6 +168,13 @@ private HttpResponse execute(HttpRequest request) {
}
}

private Boolean isNextPageUrlPresent(Optional<String> nextPageUrl) {
return nextPageUrl.isPresent() &&
!Objects.isNull(nextPageUrl.orElse(null)) &&
!nextPageUrl.orElse(null).equalsIgnoreCase("null");
}


private static List<Map<String, ?>> extractOffsets(List<SourceRecord> recordsToSend) {
return recordsToSend.stream()
.map(SourceRecord::sourceOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -59,4 +60,8 @@ public List<SourceRecord> parse(HttpResponse response) {
.map(recordMapper::map)
.collect(toList());
}

public Optional<String> getNextPageUrl(HttpResponse response) {
return recordParser.getNextPageUrl(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -65,4 +66,17 @@ public List<SourceRecord> parse(HttpResponse response) {
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
}
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
switch (policy.resolve(response)) {
case PROCESS:
return delegate.getNextPageUrl(response);
case SKIP:
return Optional.empty();
case FAIL:
default:
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public List<KvRecord> parse(HttpResponse response) {
.collect(toList());
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return responseParser.getNextPageUrl(response.getBody());
}

private KvRecord map(JacksonRecord record) {

Map<String, Object> offsets = record.getOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void configure(Map<String, ?> settings) {
keyPointer = config.getKeyPointer();
valuePointer = config.getValuePointer();
offsetPointers = config.getOffsetPointers();
timestampPointer = config.getTimestampPointer();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

@Getter
Expand All @@ -49,12 +50,14 @@ public class JacksonRecordParserConfig extends AbstractConfig {
private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer";
private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer";
private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer";
private static final String NEXT_PAGE_POINTER = "http.response.next.page.pointer";

private final JsonPointer recordsPointer;
private final List<JsonPointer> keyPointer;
private final JsonPointer valuePointer;
private final Optional<JsonPointer> timestampPointer;
private final Map<String, JsonPointer> offsetPointers;
private final Optional<JsonPointer> nextPagePointer;

JacksonRecordParserConfig(Map<String, ?> originals) {
super(config(), originals);
Expand All @@ -65,6 +68,7 @@ public class JacksonRecordParserConfig extends AbstractConfig {
offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream()
.map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue())))
.collect(toMap(Entry::getKey, Entry::getValue));
nextPagePointer = ofNullable(getString(NEXT_PAGE_POINTER)).map(JsonPointer::compile);
}

public static ConfigDef config() {
Expand All @@ -73,6 +77,7 @@ public static ConfigDef config() {
.define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer")
.define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers")
.define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer")
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers");
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers")
.define(NEXT_PAGE_POINTER, STRING, "/next", LOW, "Pointer for next page");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.Configurable;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -45,6 +46,10 @@ public class JacksonResponseRecordParser implements Configurable {

private JsonPointer recordsPointer;

private Optional<JsonPointer> nextPagePointer;

private JsonNode jsonBody;

public JacksonResponseRecordParser() {
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
}
Expand All @@ -61,14 +66,21 @@ public void configure(Map<String, ?> settings) {

Stream<JacksonRecord> getRecords(byte[] body) {

JsonNode jsonBody = serializer.deserialize(body);
this.jsonBody = serializer.deserialize(body);

Map<String, Object> responseOffset = getResponseOffset(jsonBody);

return serializer.getArrayAt(jsonBody, recordsPointer)
.map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset));
}

Optional<String> getNextPageUrl(byte[] body) {
return nextPagePointer.map(pointer ->
serializer.checkIfNonNull(this.jsonBody, pointer)
? serializer.getObjectAt(this.jsonBody, pointer).asText()
: null);
}

private Map<String, Object> getResponseOffset(JsonNode node) {
return emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ Stream<JsonNode> getArrayAt(JsonNode node, JsonPointer pointer) {
return array.isArray() ? stream(array.spliterator(), false) : Stream.of(array);
}

boolean checkIfNonNull(JsonNode node, JsonPointer pointer) {
return !node.at(pointer).isMissingNode();
}


private static JsonNode getRequiredAt(JsonNode body, JsonPointer recordsPointer) {
return JSON_ROOT.equals(recordsPointer) ? body : body.requiredAt(recordsPointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

@FunctionalInterface
public interface HttpResponseParser extends Configurable {

List<SourceRecord> parse(HttpResponse response);

default void configure(Map<String, ?> map) {
// Do nothing
}

Optional<String> getNextPageUrl(HttpResponse response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

@FunctionalInterface
public interface KvRecordHttpResponseParser extends Configurable {

List<KvRecord> parse(HttpResponse response);

default void configure(Map<String, ?> map) {
// Do nothing
}

Optional<String> getNextPageUrl(HttpResponse response);


}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.config;
import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.configWithout;
Expand Down Expand Up @@ -136,6 +137,11 @@ public static class TestResponseParser implements HttpResponseParser {
public List<SourceRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestRecordSorter implements SourceRecordSorter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -65,6 +66,11 @@ public static class TestResponseParser implements KvRecordHttpResponseParser {
public List<KvRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestRecordMapper implements KvSourceRecordMapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -62,6 +63,11 @@ public static class TestResponseParser implements HttpResponseParser {
public List<SourceRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestPolicy implements HttpResponsePolicy {
Expand Down

0 comments on commit 28e173f

Please sign in to comment.