-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#88 | Initial idea to support pagination #90
base: master
Are you sure you want to change the base?
#88 | Initial idea to support pagination #90
Conversation
1c19100
to
28e173f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @AkshatJindal1,
In general there are some issues with the extension of existing classes for purposes that violate their responsibilities. That shouldn't be a big deal, as it could be managed by finding the right abstractions, maybe a new abstraction is required to deal with the pagination specifics.
There are also some issues with some low level detail that is managed at a place where high level abstractions are used, again, not a big deal, a matter of moving code around.
Also, I would suggest you focus on a single variation of the problem first, until all abstractions are laid out, otherwise you might be adding too much complexity too soon, difficulting the task. I'm referring to the optionality of appendNextPageUrl
.
My main concern, though, has nothing to do with the above. It's about how state is being managed. Have you considered what would happen if the connector is restarted at any point?
The state you are relaying on, like modifiedUrl
or the body of the previous response, would be gone.
That's why it's important all relevant information sharing happens through the Offset
. That's the only information that's persisted and can be used for recovery.
Would there be a way of moving that relevant state to the offset of the records? I would focus on this problem first, before looking for the right abstractions.
Thanks,
Best regards.
public interface KvRecordHttpResponseParser extends Configurable { | ||
|
||
List<KvRecord> parse(HttpResponse response); | ||
|
||
default void configure(Map<String, ?> map) { | ||
// Do nothing | ||
} | ||
|
||
Optional<String> getNextPageUrl(HttpResponse response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as HttpResponseParser
.
public interface HttpResponseParser extends Configurable { | ||
|
||
List<SourceRecord> parse(HttpResponse response); | ||
|
||
default void configure(Map<String, ?> map) { | ||
// Do nothing | ||
} | ||
|
||
Optional<String> getNextPageUrl(HttpResponse response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The responsibility of this parser is to translate responses to records, this additional responsibility is not cohesive with it.
In other words, we are breaking single responsibility here. If the new functionality were cohesive with the existing one, maybe we could consider raising the abstraction level of this interface to absorb it, but I don't think it is.
serializer.checkIfNonNull(this.jsonBody, pointer) | ||
? serializer.getObjectAt(this.jsonBody, pointer).asText() | ||
: null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would belong better one level down at the serializer, by providing something like this:
Optional<JsonNode> getObjectAtIfPresent(JsonNode node, JsonPointer pointer)
That would mean you wouldn't need to expose checkIfNonNull
, as it would be leaving that responsibility closer to where the decision is made.
@@ -45,6 +46,10 @@ | |||
|
|||
private JsonPointer recordsPointer; | |||
|
|||
private Optional<JsonPointer> nextPagePointer; | |||
|
|||
private JsonNode jsonBody; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is meant to be stateless, we shouldn't be keeping this body around here.
@@ -73,6 +77,7 @@ public static ConfigDef config() { | |||
.define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer") | |||
.define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers") | |||
.define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer") | |||
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers"); | |||
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers") | |||
.define(NEXT_PAGE_POINTER, STRING, "/next", LOW, "Pointer for next page"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably not fair to assume something as arbitrary "/next"
as a default.
@Override | ||
public Optional<String> getNextPageUrl(HttpResponse response) { | ||
switch (policy.resolve(response)) { | ||
case PROCESS: | ||
return delegate.getNextPageUrl(response); | ||
case SKIP: | ||
return Optional.empty(); | ||
case FAIL: | ||
default: | ||
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse(""))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the PolicyHttpResponseParser
is to enforce a policy for whether response should be parsed, skipped, or failed based on http response status codes.
What's the reasoning to fit this functionality here? isn't it unrelated to the purpose of the class?
request = HttpRequest.builder() | ||
.method(request.getMethod()) | ||
.url(modifiedUrl) | ||
.headers(request.getHeaders()) | ||
.body(request.getBody()) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are creating a request here. Why not doing it in the entity meant for that: RequestFactory
?
You might want to create a RequestFactory
specific for this use case though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to create a different request factory to take care of pagination. But I am stuck at a point. I am not able to understand how to pass the next page URL from the response to this request factory to create a new request.
if(handlePagination) { | ||
Optional<String> nextPageUrl = responseParser.getNextPageUrl(response); | ||
log.info("Next page URL: {}", nextPageUrl.orElse("no value")); | ||
if( isNextPageUrlPresent(nextPageUrl) ) { | ||
modifiedUrl = appendNextPageUrl | ||
? baseUrl + nextPageUrl.orElse("") | ||
: nextPageUrl.orElse(null); | ||
} else { | ||
modifiedUrl = null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be extracted somewhere else, it doesn't seem to be at the same level of abstraction as the rest of the code in this method. Too detailed and specific about pagination.
|
||
private String modifiedUrl; | ||
|
||
private HttpRequest request = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this here?
This is not a final version to support pagination #88 . This is my idea. Please let me know if this approach looks good.