Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Paginate on lastest Response #35560

Merged
merged 5 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions airflow/providers/http/operators/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ class HttpOperator(BaseOperator):
:param data: The data to pass. POST-data in POST/PUT and params
in the URL for a GET request. (templated)
:param headers: The HTTP headers to be added to the GET request
:param pagination_function: A callable that generates the parameters used to call the API again.
Typically used when the API is paginated and returns for e.g a cursor, a 'next page id', or
a 'next page URL'. When provided, the Operator will call the API repeatedly until this callable
returns None. Also, the result of the Operator will become by default a list of Response.text
objects (instead of a single response object). Same with the other injected functions (like
response_check, response_filter, ...) which will also receive a list of Response object. This
function should return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`),
which will be merged and override the one used in the initial API call.
:param pagination_function: A callable that generates the parameters used to call the API again,
based on the previous response. Typically used when the API is paginated and returns for e.g a
cursor, a 'next page id', or a 'next page URL'. When provided, the Operator will call the API
repeatedly until this callable returns None. Also, the result of the Operator will become by
default a list of Response.text objects (instead of a single response object). Same with the
other injected functions (like response_check, response_filter, ...) which will also receive a
list of Response object. This function receives a Response object form previous call, and should
return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), which will be merged
and will override the one used in the initial API call.
:param response_check: A check against the 'requests' response object.
The callable takes the response object as the first positional argument
and optionally any number of keyword arguments available in the context dictionary.
Expand Down Expand Up @@ -162,16 +163,16 @@ def execute(self, context: Context) -> Any:
def execute_sync(self, context: Context) -> Any:
self.log.info("Calling HTTP method")
response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
response = self.paginate_sync(first_response=response)
response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)

def paginate_sync(self, first_response: Response) -> Response | list[Response]:
def paginate_sync(self, response: Response) -> Response | list[Response]:
if not self.pagination_function:
return first_response
return response

all_responses = [first_response]
all_responses = [response]
while True:
next_page_params = self.pagination_function(first_response)
next_page_params = self.pagination_function(response)
if not next_page_params:
break
response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, your change just renames the method paginate_sync first_response param to response, how does this fix the bug?

Lines 177/178 set the response to a response variable.

With this PR, the response is correctly reattributed and reach the pagination_function. Previously, the pagination function was repeatedly called with the first_response, which never changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, the response value is updated after each iteration

Expand Down
20 changes: 11 additions & 9 deletions tests/providers/http/operators/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,33 @@ def test_paginated_responses(self, requests_mock):
pagination_function is provided, and as long as this function returns
a dictionary that override previous' call parameters.
"""
has_returned: bool = False
iterations: int = 0

def pagination_function(response: Response) -> dict | None:
"""Paginated function which returns None at the second call."""
nonlocal has_returned
if not has_returned:
has_returned = True
nonlocal iterations
if iterations < 2:
iterations += 1
return dict(
endpoint="/",
data={"cursor": "example"},
endpoint=response.json()["endpoint"],
data={},
headers={},
extra_options={},
)
return None

requests_mock.get("http://www.example.com", json={"value": 5})
requests_mock.get("http://www.example.com/foo", json={"value": 5, "endpoint": "bar"})
requests_mock.get("http://www.example.com/bar", json={"value": 10, "endpoint": "foo"})
operator = HttpOperator(
task_id="test_HTTP_op",
method="GET",
endpoint="/",
endpoint="/foo",
http_conn_id="HTTP_EXAMPLE",
pagination_function=pagination_function,
response_filter=lambda resp: [entry.json()["value"] for entry in resp],
)
result = operator.execute({})
assert result == ['{"value": 5}', '{"value": 5}']
assert result == [5, 10, 5]

def test_async_paginated_responses(self, requests_mock):
"""
Expand Down