From 16751ee16b5f0314ec5c548e2e97c28b1ce72755 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 31 Aug 2022 20:04:25 -0500 Subject: [PATCH 1/3] feat(taps): Implement reference paginators (#732) * feat: Implement paginators * docs: Add documentation for new implementation * Update singer_sdk/pagination.py * Update docs/porting.md Co-authored-by: Aaron ("AJ") Steers * Make linter happy * Make pre-commit happy * Use args and kwargs for base class * Full coverage for new code * Remove commented type var Co-authored-by: Aaron ("AJ") Steers --- ...singer_sdk.pagination.BaseAPIPaginator.rst | 7 + ...er_sdk.pagination.BaseHATEOASPaginator.rst | 7 + ...ger_sdk.pagination.BaseOffsetPaginator.rst | 7 + ...sdk.pagination.BasePageNumberPaginator.rst | 7 + ...ger_sdk.pagination.HeaderLinkPaginator.rst | 7 + ...inger_sdk.pagination.JSONPathPaginator.rst | 7 + ...gination.LegacyPaginatedStreamProtocol.rst | 7 + ...r_sdk.pagination.LegacyStreamPaginator.rst | 7 + ...r_sdk.pagination.SimpleHeaderPaginator.rst | 7 + ...ger_sdk.pagination.SinglePagePaginator.rst | 7 + docs/porting.md | 74 ++- docs/reference.rst | 19 + pyproject.toml | 1 + .../sample_tap_gitlab/gitlab_rest_streams.py | 36 +- singer_sdk/pagination.py | 422 ++++++++++++++++++ singer_sdk/streams/rest.py | 67 ++- tests/core/rest/test_pagination.py | 354 +++++++++++++++ tests/core/test_streams.py | 40 +- 18 files changed, 1015 insertions(+), 68 deletions(-) create mode 100644 docs/classes/singer_sdk.pagination.BaseAPIPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.BaseHATEOASPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.BaseOffsetPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.BasePageNumberPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.HeaderLinkPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.JSONPathPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.LegacyPaginatedStreamProtocol.rst create mode 100644 docs/classes/singer_sdk.pagination.LegacyStreamPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.SimpleHeaderPaginator.rst create mode 100644 docs/classes/singer_sdk.pagination.SinglePagePaginator.rst create mode 100644 singer_sdk/pagination.py create mode 100644 tests/core/rest/test_pagination.py diff --git a/docs/classes/singer_sdk.pagination.BaseAPIPaginator.rst b/docs/classes/singer_sdk.pagination.BaseAPIPaginator.rst new file mode 100644 index 000000000..f44e8d617 --- /dev/null +++ b/docs/classes/singer_sdk.pagination.BaseAPIPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.BaseAPIPaginator +====================================== + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: BaseAPIPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.BaseHATEOASPaginator.rst b/docs/classes/singer_sdk.pagination.BaseHATEOASPaginator.rst new file mode 100644 index 000000000..b5a6f2b5e --- /dev/null +++ b/docs/classes/singer_sdk.pagination.BaseHATEOASPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.BaseHATEOASPaginator +========================================== + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: BaseHATEOASPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.BaseOffsetPaginator.rst b/docs/classes/singer_sdk.pagination.BaseOffsetPaginator.rst new file mode 100644 index 000000000..ea7370d5b --- /dev/null +++ b/docs/classes/singer_sdk.pagination.BaseOffsetPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.BaseOffsetPaginator +========================================= + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: BaseOffsetPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.BasePageNumberPaginator.rst b/docs/classes/singer_sdk.pagination.BasePageNumberPaginator.rst new file mode 100644 index 000000000..d33515ec3 --- /dev/null +++ b/docs/classes/singer_sdk.pagination.BasePageNumberPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.BasePageNumberPaginator +============================================= + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: BasePageNumberPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.HeaderLinkPaginator.rst b/docs/classes/singer_sdk.pagination.HeaderLinkPaginator.rst new file mode 100644 index 000000000..f651a4116 --- /dev/null +++ b/docs/classes/singer_sdk.pagination.HeaderLinkPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.HeaderLinkPaginator +========================================= + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: HeaderLinkPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.JSONPathPaginator.rst b/docs/classes/singer_sdk.pagination.JSONPathPaginator.rst new file mode 100644 index 000000000..59be0a5bf --- /dev/null +++ b/docs/classes/singer_sdk.pagination.JSONPathPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.JSONPathPaginator +======================================= + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: JSONPathPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.LegacyPaginatedStreamProtocol.rst b/docs/classes/singer_sdk.pagination.LegacyPaginatedStreamProtocol.rst new file mode 100644 index 000000000..7a3f529de --- /dev/null +++ b/docs/classes/singer_sdk.pagination.LegacyPaginatedStreamProtocol.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.LegacyPaginatedStreamProtocol +=================================================== + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: LegacyPaginatedStreamProtocol + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.LegacyStreamPaginator.rst b/docs/classes/singer_sdk.pagination.LegacyStreamPaginator.rst new file mode 100644 index 000000000..9d96831cd --- /dev/null +++ b/docs/classes/singer_sdk.pagination.LegacyStreamPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.LegacyStreamPaginator +=========================================== + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: LegacyStreamPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.SimpleHeaderPaginator.rst b/docs/classes/singer_sdk.pagination.SimpleHeaderPaginator.rst new file mode 100644 index 000000000..4fce626e3 --- /dev/null +++ b/docs/classes/singer_sdk.pagination.SimpleHeaderPaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.SimpleHeaderPaginator +=========================================== + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: SimpleHeaderPaginator + :members: \ No newline at end of file diff --git a/docs/classes/singer_sdk.pagination.SinglePagePaginator.rst b/docs/classes/singer_sdk.pagination.SinglePagePaginator.rst new file mode 100644 index 000000000..dc15a5d74 --- /dev/null +++ b/docs/classes/singer_sdk.pagination.SinglePagePaginator.rst @@ -0,0 +1,7 @@ +singer_sdk.pagination.SinglePagePaginator +========================================= + +.. currentmodule:: singer_sdk.pagination + +.. autoclass:: SinglePagePaginator + :members: \ No newline at end of file diff --git a/docs/porting.md b/docs/porting.md index 91e27796f..05b88a582 100644 --- a/docs/porting.md +++ b/docs/porting.md @@ -103,10 +103,82 @@ _Important: If you've gotten this far, this is a good time to commit your code b Pagination is generally unique for almost every API. There's no single method that solves for very different API's approach to pagination. -Most likely you will use `get_next_page_token` to parse and return whatever the "next page" token is for your source, and you'll use `get_url_params` to define how to pass the "next page" token back to the API when asking for subsequent pages. +Most likely you will use [get_new_paginator](singer_sdk.RESTStream.get_new_paginator) to instantiate a [pagination class](./classes/singer_sdk.pagination.BaseAPIPaginator) for your source, and you'll use `get_url_params` to define how to pass the "next page" token back to the API when asking for subsequent pages. When you think you have it right, run `poetry run tap-mysource` again, and debug until you are confident the result is including multiple pages back from the API. +You can also add unit tests for your pagination implementation for additional confidence: + +```python +from singer_sdk.pagination import BaseHATEOASPaginator, first + + +class CustomHATEOASPaginator(BaseHATEOASPaginator): + """Paginator for HATEOAS APIs - or "Hypermedia as the Engine of Application State". + + This paginator expects responses to have a key "next" with a value + like "https://api.com/link/to/next-item". + """" + + def get_next_url(self, response: Response) -> str | None: + """Get a parsed HATEOAS link for the next, if the response has one.""" + + try: + return first( + extract_jsonpath("$.links[?(@.rel=='next')].href", response.json()) + ) + except StopIteration: + return None + + +def test_paginator_custom_hateoas(): + """Validate paginator that my custom paginator.""" + + resource_path = "/path/to/resource" + response = Response() + paginator = CustomHATEOASPaginator() + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response._content = json.dumps( + { + "links": [ + { + "rel": "next", + "href": f"{resource_path}?page=2&limit=100", + } + ] + } + ).encode() + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=2&limit=100" + assert paginator.count == 1 + + response._content = json.dumps( + { + "links": [ + { + "rel": "next", + "href": f"{resource_path}?page=3&limit=100", + } + ] + } + ).encode() + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=3&limit=100" + assert paginator.count == 2 + + response._content = json.dumps({"links": []}).encode() + paginator.advance(response) + assert paginator.finished + assert paginator.count == 3 +``` + Note: Depending on how well the API is designed, this could take 5 minutes or multiple hours. If you need help, sometimes [PostMan](https://postman.com) or [Thunder Client](https://marketplace.visualstudio.com/items?itemName=rangav.vscode-thunder-client) can be helpful in debugging the APIs specific quirks. ## Run pytest diff --git a/docs/reference.rst b/docs/reference.rst index f4c517268..c3b4a0605 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -89,3 +89,22 @@ JSON Schema builder classes :template: module.rst typing + + +Pagination +---------- + +.. autosummary:: + :toctree: classes + :template: class.rst + + pagination.BaseAPIPaginator + pagination.SinglePagePaginator + pagination.BaseHATEOASPaginator + pagination.HeaderLinkPaginator + pagination.JSONPathPaginator + pagination.SimpleHeaderPaginator + pagination.BasePageNumberPaginator + pagination.BaseOffsetPaginator + pagination.LegacyPaginatedStreamProtocol + pagination.LegacyStreamPaginator diff --git a/pyproject.toml b/pyproject.toml index c0c962c1a..9859557c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,6 +147,7 @@ exclude_lines = [ "raise NotImplementedError", "if __name__ == .__main__.:", '''class .*\bProtocol\):''', + '''@(abc\.)?abstractmethod''', ] fail_under = 82 diff --git a/samples/sample_tap_gitlab/gitlab_rest_streams.py b/samples/sample_tap_gitlab/gitlab_rest_streams.py index 425348e6e..5c0bd44c3 100644 --- a/samples/sample_tap_gitlab/gitlab_rest_streams.py +++ b/samples/sample_tap_gitlab/gitlab_rest_streams.py @@ -1,11 +1,12 @@ """Sample tap stream test for tap-gitlab.""" -from pathlib import Path -from typing import Any, Dict, List, Optional, cast +from __future__ import annotations -import requests +from pathlib import Path +from typing import Any, cast from singer_sdk.authenticators import SimpleAuthenticator +from singer_sdk.pagination import SimpleHeaderPaginator from singer_sdk.streams.rest import RESTStream from singer_sdk.typing import ( ArrayType, @@ -21,7 +22,7 @@ DEFAULT_URL_BASE = "https://gitlab.com/api/v4" -class GitlabStream(RESTStream): +class GitlabStream(RESTStream[str]): """Sample tap test for gitlab.""" _LOG_REQUEST_METRIC_URLS = True @@ -39,8 +40,8 @@ def authenticator(self) -> SimpleAuthenticator: ) def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] - ) -> Dict[str, Any]: + self, context: dict | None, next_page_token: str | None + ) -> dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params: dict = {} if next_page_token: @@ -50,21 +51,20 @@ def get_url_params( params["order_by"] = self.replication_key return params - def get_next_page_token( - self, response: requests.Response, previous_token: Optional[Any] - ) -> Optional[Any]: - """Return token for identifying next page or None if not applicable.""" - next_page_token = response.headers.get("X-Next-Page", None) - if next_page_token: - self.logger.debug(f"Next page token retrieved: {next_page_token}") - return next_page_token + def get_new_paginator(self) -> SimpleHeaderPaginator: + """Return a new paginator for GitLab API endpoints. + + Returns: + A new paginator. + """ + return SimpleHeaderPaginator("X-Next-Page") class ProjectBasedStream(GitlabStream): """Base class for streams that are keys based on project ID.""" @property - def partitions(self) -> List[dict]: + def partitions(self) -> list[dict]: """Return a list of partition key dicts (if applicable), otherwise None.""" if "{project_id}" in self.path: return [ @@ -162,7 +162,7 @@ class EpicsStream(ProjectBasedStream): # schema_filepath = SCHEMAS_DIR / "epics.json" - def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + def get_child_context(self, record: dict, context: dict | None) -> dict: """Perform post processing, including queuing up any child stream types.""" # Ensure child state record(s) are created return { @@ -183,8 +183,8 @@ class EpicIssuesStream(GitlabStream): parent_stream_type = EpicsStream # Stream should wait for parents to complete. def get_url_params( - self, context: Optional[dict], next_page_token: Optional[Any] - ) -> Dict[str, Any]: + self, context: dict | None, next_page_token: str | None + ) -> dict[str, Any]: """Return a dictionary of values to be used in parameterization.""" result = super().get_url_params(context, next_page_token) if not context or "epic_id" not in context: diff --git a/singer_sdk/pagination.py b/singer_sdk/pagination.py new file mode 100644 index 000000000..8e294c3a5 --- /dev/null +++ b/singer_sdk/pagination.py @@ -0,0 +1,422 @@ +"""Generic paginator classes.""" + +from __future__ import annotations + +import sys +from abc import ABCMeta, abstractmethod +from typing import Any, Generic, Iterable, Optional, TypeVar +from urllib.parse import ParseResult, urlparse + +from requests import Response + +from singer_sdk.helpers.jsonpath import extract_jsonpath + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + +T = TypeVar("T") +TPageToken = TypeVar("TPageToken") + + +def first(iterable: Iterable[T]) -> T: + """Return the first element of an iterable or raise an exception. + + Args: + iterable: An iterable. + + Returns: + The first element of the iterable. + + >>> first('ABC') + 'A' + """ + return next(iter(iterable)) + + +class BaseAPIPaginator(Generic[TPageToken], metaclass=ABCMeta): + """An API paginator object.""" + + def __init__(self, start_value: TPageToken) -> None: + """Create a new paginator. + + Args: + start_value: Initial value. + """ + self._value: TPageToken = start_value + self._page_count = 0 + self._finished = False + self._last_seen_record: dict | None = None + + @property + def current_value(self) -> TPageToken: + """Get the current pagination value. + + Returns: + Current page value. + """ + return self._value + + @property + def finished(self) -> bool: + """Get a flag that indicates if the last page of data has been reached. + + Returns: + True if there are no more pages. + """ + return self._finished + + @property + def count(self) -> int: + """Count the number of pages traversed so far. + + Returns: + Number of pages. + """ + return self._page_count + + def __str__(self) -> str: + """Stringify this object. + + Returns: + String representation. + """ + return f"{self.__class__.__name__}<{self.current_value}>" + + def __repr__(self) -> str: + """Stringify this object. + + Returns: + String representation. + """ + return str(self) + + def advance(self, response: Response) -> None: + """Get a new page value and advance the current one. + + Args: + response: API response object. + + Raises: + RuntimeError: If a loop in pagination is detected. That is, when two + consecutive pagination tokens are identical. + """ + self._page_count += 1 + + if not self.has_more(response): + self._finished = True + return + + new_value = self.get_next(response) + + if new_value and new_value == self._value: + raise RuntimeError( + f"Loop detected in pagination. " + f"Pagination token {new_value} is identical to prior token." + ) + + # Stop if new value None, empty string, 0, etc. + if not new_value: + self._finished = True + else: + self._value = new_value + + def has_more(self, response: Response) -> bool: + """Override this method to check if the endpoint has any pages left. + + Args: + response: API response object. + + Returns: + Boolean flag used to indicate if the endpoint has more pages. + """ + return True + + @abstractmethod + def get_next(self, response: Response) -> TPageToken | None: + """Get the next pagination token or index from the API response. + + Args: + response: API response object. + + Returns: + The next page token or index. Return `None` from this method to indicate + the end of pagination. + """ + ... + + +class SinglePagePaginator(BaseAPIPaginator[None]): + """A paginator that does works with single-page endpoints.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Create a new paginator. + + Args: + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + + def get_next(self, response: Response) -> None: + """Get the next pagination token or index from the API response. + + Args: + response: API response object. + + Returns: + The next page token or index. Return `None` from this method to indicate + the end of pagination. + """ + return None + + +class BaseHATEOASPaginator(BaseAPIPaginator[Optional[ParseResult]], metaclass=ABCMeta): + """Paginator class for APIs supporting HATEOAS links in their response bodies. + + HATEOAS stands for "Hypermedia as the Engine of Application State". See + https://en.wikipedia.org/wiki/HATEOAS. + + This paginator expects responses to have a key "next" with a value + like "https://api.com/link/to/next-item". + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Create a new paginator. + + Args: + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + + @abstractmethod + def get_next_url(self, response: Response) -> str | None: + """Override this method to extract a HATEOAS link from the response. + + Args: + response: API response object. + """ + ... + + def get_next(self, response: Response) -> ParseResult | None: + """Get the next pagination token or index from the API response. + + Args: + response: API response object. + + Returns: + A parsed HATEOAS link if the response has one, otherwise `None`. + """ + next_url = self.get_next_url(response) + return urlparse(next_url) if next_url else None + + +class HeaderLinkPaginator(BaseHATEOASPaginator): + """Paginator class for APIs supporting HATEOAS links in their headers. + + Links: + - https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link + - https://datatracker.ietf.org/doc/html/rfc8288#section-3 + """ + + def get_next_url(self, response: Response) -> str | None: + """Override this method to extract a HATEOAS link from the response. + + Args: + response: API response object. + + Returns: + A HATEOAS link parsed from the response headers. + """ + url: str | None = response.links.get("next", {}).get("url") + return url + + +class JSONPathPaginator(BaseAPIPaginator[Optional[str]]): + """Paginator class for APIs returning a pagination token in the response body.""" + + def __init__( + self, + jsonpath: str, + *args: Any, + **kwargs: Any, + ) -> None: + """Create a new paginator. + + Args: + jsonpath: A JSONPath expression. + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + self._jsonpath = jsonpath + + def get_next(self, response: Response) -> str | None: + """Get the next page token. + + Args: + response: API response object. + + Returns: + The next page token. + """ + all_matches = extract_jsonpath(self._jsonpath, response.json()) + return next(all_matches, None) + + +class SimpleHeaderPaginator(BaseAPIPaginator[Optional[str]]): + """Paginator class for APIs returning a pagination token in the response headers.""" + + def __init__( + self, + key: str, + *args: Any, + **kwargs: Any, + ) -> None: + """Create a new paginator. + + Args: + key: Header key that contains the next page token. + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + self._key = key + + def get_next(self, response: Response) -> str | None: + """Get the next page token. + + Args: + response: API response object. + + Returns: + The next page token. + """ + return response.headers.get(self._key, None) + + +class BasePageNumberPaginator(BaseAPIPaginator[int], metaclass=ABCMeta): + """Paginator class for APIs that use page number.""" + + @abstractmethod + def has_more(self, response: Response) -> bool: + """Override this method to check if the endpoint has any pages left. + + Args: + response: API response object. + + Returns: + Boolean flag used to indicate if the endpoint has more pages. + + """ + ... + + def get_next(self, response: Response) -> int | None: + """Get the next page number. + + Args: + response: API response object. + + Returns: + The next page number. + """ + return self._value + 1 + + +class BaseOffsetPaginator(BaseAPIPaginator[int], metaclass=ABCMeta): + """Paginator class for APIs that use page offset.""" + + def __init__( + self, + start_value: int, + page_size: int, + *args: Any, + **kwargs: Any, + ) -> None: + """Create a new paginator. + + Args: + start_value: Initial value. + page_size: Constant page size. + args: Paginator positional arguments. + kwargs: Paginator keyword arguments. + """ + super().__init__(start_value, *args, **kwargs) + self._page_size = page_size + + @abstractmethod + def has_more(self, response: Response) -> bool: + """Override this method to check if the endpoint has any pages left. + + Args: + response: API response object. + + Returns: + Boolean flag used to indicate if the endpoint has more pages. + """ + ... + + def get_next(self, response: Response) -> int | None: + """Get the next page offset. + + Args: + response: API response object. + + Returns: + The next page offset. + """ + return self._value + self._page_size + + +class LegacyPaginatedStreamProtocol(Protocol[TPageToken]): + """Protocol for legacy paginated streams classes.""" + + def get_next_page_token( + self, + response: Response, + previous_token: TPageToken | None, + ) -> TPageToken | None: + """Get the next page token. + + Args: + response: API response object. + previous_token: Previous page token. + """ + ... # pragma: no cover + + +class LegacyStreamPaginator( + BaseAPIPaginator[Optional[TPageToken]], + Generic[TPageToken], +): + """Paginator that works with REST streams as they exist today.""" + + def __init__( + self, + stream: LegacyPaginatedStreamProtocol[TPageToken], + *args: Any, + **kwargs: Any, + ) -> None: + """Create a new paginator. + + Args: + stream: A RESTStream instance. + args: Paginator positional arguments for base class. + kwargs: Paginator keyword arguments for base class. + """ + super().__init__(None, *args, **kwargs) + self.stream = stream + + def get_next(self, response: Response) -> TPageToken | None: + """Get next page value by calling the stream method. + + Args: + response: API response object. + + Returns: + The next page token or index. Return `None` from this method to indicate + the end of pagination. + """ + return self.stream.get_next_page_token(response, self.current_value) diff --git a/singer_sdk/streams/rest.py b/singer_sdk/streams/rest.py index dd302d04f..f69a0de4c 100644 --- a/singer_sdk/streams/rest.py +++ b/singer_sdk/streams/rest.py @@ -8,6 +8,7 @@ from datetime import datetime from typing import Any, Callable, Generator, Generic, Iterable, TypeVar, Union from urllib.parse import urlparse +from warnings import warn import backoff import requests @@ -16,6 +17,12 @@ from singer_sdk.authenticators import APIAuthenticatorBase, SimpleAuthenticator from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.pagination import ( + BaseAPIPaginator, + JSONPathPaginator, + LegacyStreamPaginator, + SimpleHeaderPaginator, +) from singer_sdk.plugin_base import PluginBase as TapBaseClass from singer_sdk.streams.core import Stream @@ -335,33 +342,20 @@ def request_records(self, context: dict | None) -> Iterable[dict]: Yields: An item for every record in the response. - - Raises: - RuntimeError: If a loop in pagination is detected. That is, when two - consecutive pagination tokens are identical. """ - next_page_token: _TToken | None = None - finished = False + paginator = self.get_new_paginator() decorated_request = self.request_decorator(self._request) - while not finished: + while not paginator.finished: prepared_request = self.prepare_request( - context, next_page_token=next_page_token + context, + next_page_token=paginator.current_value, ) resp = decorated_request(prepared_request, context) self.update_sync_costs(prepared_request, resp, context) yield from self.parse_response(resp) - previous_token = copy.deepcopy(next_page_token) - next_page_token = self.get_next_page_token( - response=resp, previous_token=previous_token - ) - if next_page_token and next_page_token == previous_token: - raise RuntimeError( - f"Loop detected in pagination. " - f"Pagination token {next_page_token} is identical to prior token." - ) - # Cycle until get_next_page_token() no longer returns a value - finished = not next_page_token + + paginator.advance(resp) def update_sync_costs( self, @@ -441,33 +435,24 @@ def prepare_request_payload( """ return None - def get_next_page_token( - self, - response: requests.Response, - previous_token: _TToken | None, - ) -> _TToken | None: - """Return token identifying next page or None if all records have been read. - - Args: - response: A raw `requests.Response`_ object. - previous_token: Previous pagination reference. + def get_new_paginator(self) -> BaseAPIPaginator: + """Get a fresh paginator for this API endpoint. Returns: - Reference value to retrieve next page. - - .. _requests.Response: - https://requests.readthedocs.io/en/latest/api/#requests.Response + A paginator instance. """ - if self.next_page_token_jsonpath: - all_matches = extract_jsonpath( - self.next_page_token_jsonpath, response.json() + if hasattr(self, "get_next_page_token"): + warn( + "`RESTStream.get_next_page_token` is deprecated and will not be used " + + "in a future version of the Meltano SDK. " + + "Override `RESTStream.get_new_paginator` instead.", + DeprecationWarning, ) - first_match = next(iter(all_matches), None) - next_page_token = first_match + return LegacyStreamPaginator(self) # type: ignore + elif self.next_page_token_jsonpath: + return JSONPathPaginator(self.next_page_token_jsonpath) else: - next_page_token = response.headers.get("X-Next-Page", None) - - return next_page_token + return SimpleHeaderPaginator("X-Next-Page") @property def http_headers(self) -> dict: diff --git a/tests/core/rest/test_pagination.py b/tests/core/rest/test_pagination.py new file mode 100644 index 000000000..c61124610 --- /dev/null +++ b/tests/core/rest/test_pagination.py @@ -0,0 +1,354 @@ +"""Tests generic paginator classes.""" + +from __future__ import annotations + +import json +from typing import Any + +import pytest +from requests import Response + +from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.pagination import ( + BaseAPIPaginator, + BaseHATEOASPaginator, + BaseOffsetPaginator, + BasePageNumberPaginator, + HeaderLinkPaginator, + JSONPathPaginator, + SimpleHeaderPaginator, + SinglePagePaginator, + first, +) + + +def test_paginator_base_missing_implementation(): + """Validate that `BaseAPIPaginator` implementation requires `get_next`.""" + + with pytest.raises( + TypeError, + match="Can't instantiate abstract class .* get_next", + ): + BaseAPIPaginator(0) + + +def test_single_page_paginator(): + """Validate single page paginator.""" + + response = Response() + paginator = SinglePagePaginator() + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + paginator.advance(response) + assert paginator.finished + assert paginator.current_value is None + assert paginator.count == 1 + + +def test_paginator_page_number_missing_implementation(): + """Validate that `BasePageNumberPaginator` implementation requires `has_more`.""" + + with pytest.raises( + TypeError, + match="Can't instantiate abstract class .* has_more", + ): + BasePageNumberPaginator(1) + + +def test_paginator_offset_missing_implementation(): + """Validate that `BaseOffsetPaginator` implementation requires `has_more`.""" + + with pytest.raises( + TypeError, + match="Can't instantiate abstract class .* has_more", + ): + BaseOffsetPaginator(0, 100) + + +def test_paginator_hateoas_missing_implementation(): + """Validate that `BaseHATEOASPaginator` implementation requires `get_next_url`.""" + + with pytest.raises( + TypeError, + match="Can't instantiate abstract class .* get_next_url", + ): + BaseHATEOASPaginator() + + +def test_paginator_attributes(): + """Validate paginator that uses the page number.""" + + response = Response() + paginator = JSONPathPaginator(jsonpath="$.nextPageToken") + assert str(paginator) == "JSONPathPaginator" + + response._content = b'{"nextPageToken": "abc"}' + paginator.advance(response) + assert str(paginator) == "JSONPathPaginator" + + +def test_paginator_loop(): + """Validate paginator that uses the page number.""" + + response = Response() + paginator = JSONPathPaginator(jsonpath="$.nextPageToken") + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response._content = b'{"nextPageToken": "abc"}' + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == "abc" + assert paginator.count == 1 + + response._content = b'{"nextPageToken": "abc"}' + with pytest.raises(RuntimeError, match="Loop detected in pagination"): + paginator.advance(response) + + +def test_paginator_page_number(): + """Validate paginator that uses the page number.""" + + class _TestPageNumberPaginator(BasePageNumberPaginator): + def has_more(self, response: Response) -> bool: + return response.json()["hasMore"] + + has_more_response = b'{"hasMore": true}' + no_more_response = b'{"hasMore": false}' + + response = Response() + paginator = _TestPageNumberPaginator(0) + assert not paginator.finished + assert paginator.current_value == 0 + assert paginator.count == 0 + + response._content = has_more_response + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == 1 + assert paginator.count == 1 + + response._content = has_more_response + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == 2 + assert paginator.count == 2 + + response._content = no_more_response + paginator.advance(response) + assert paginator.finished + assert paginator.count == 3 + + +def test_paginator_offset(): + """Validate paginator that uses the page offset.""" + + class _TestOffsetPaginator(BaseOffsetPaginator): + def __init__( + self, + start_value: int, + page_size: int, + records_jsonpath: str, + *args: Any, + **kwargs: Any, + ) -> None: + super().__init__(start_value, page_size, *args, **kwargs) + self._records_jsonpath = records_jsonpath + + def has_more(self, response: Response) -> bool: + """Check if response has any records. + + Args: + response: API response object. + + Returns: + Boolean flag used to indicate if the endpoint has more pages. + """ + try: + first( + extract_jsonpath( + self._records_jsonpath, + response.json(), + ) + ) + except StopIteration: + return False + + return True + + response = Response() + paginator = _TestOffsetPaginator(0, 2, "$[*]") + assert not paginator.finished + assert paginator.current_value == 0 + assert paginator.count == 0 + + response._content = b'[{"id": 1}, {"id": 2}]' + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == 2 + assert paginator.count == 1 + + response._content = b'[{"id": 3}]' + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == 4 + assert paginator.count == 2 + + response._content = b"[]" + paginator.advance(response) + assert paginator.finished + assert paginator.count == 3 + + +def test_paginator_jsonpath(): + """Validate paginator that uses JSONPath.""" + + response = Response() + paginator = JSONPathPaginator(jsonpath="$.nextPageToken") + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response._content = b'{"nextPageToken": "abc"}' + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == "abc" + assert paginator.count == 1 + + response._content = b'{"nextPageToken": null}' + paginator.advance(response) + assert paginator.finished + assert paginator.count == 2 + + +def test_paginator_header(): + """Validate paginator that uses response headers.""" + + key = "X-Next-Page" + response = Response() + paginator = SimpleHeaderPaginator(key=key) + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response.headers[key] = "abc" + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value == "abc" + assert paginator.count == 1 + + response.headers[key] = None + paginator.advance(response) + assert paginator.finished + assert paginator.count == 2 + + +def test_paginator_header_links(): + """Validate paginator that uses HATEOAS links.""" + + api_hostname = "my.api.test" + resource_path = "/path/to/resource" + + response = Response() + paginator = HeaderLinkPaginator() + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response.headers.update( + {"Link": f"; rel=next"}, + ) + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.hostname == api_hostname + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=2&limit=100" + assert paginator.count == 1 + + response.headers.update( + { + "Link": ( + f";rel=next," + f";rel=back" + ) + }, + ) + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.hostname == api_hostname + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=3&limit=100" + assert paginator.count == 2 + + response.headers.update( + {"Link": ";rel=back"}, + ) + paginator.advance(response) + assert paginator.finished + assert paginator.count == 3 + + +def test_paginator_custom_hateoas(): + """Validate paginator that uses HATEOAS links.""" + + class _CustomHATEOASPaginator(BaseHATEOASPaginator): + def get_next_url(self, response: Response) -> str | None: + """Get a parsed HATEOAS link for the next, if the response has one.""" + + try: + return first( + extract_jsonpath( + "$.links[?(@.rel=='next')].href", + response.json(), + ) + ) + except StopIteration: + return None + + resource_path = "/path/to/resource" + + response = Response() + paginator = _CustomHATEOASPaginator() + assert not paginator.finished + assert paginator.current_value is None + assert paginator.count == 0 + + response._content = json.dumps( + { + "links": [ + { + "rel": "next", + "href": f"{resource_path}?page=2&limit=100", + } + ] + } + ).encode() + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=2&limit=100" + assert paginator.count == 1 + + response._content = json.dumps( + { + "links": [ + { + "rel": "next", + "href": f"{resource_path}?page=3&limit=100", + } + ] + } + ).encode() + paginator.advance(response) + assert not paginator.finished + assert paginator.current_value.path == resource_path + assert paginator.current_value.query == "page=3&limit=100" + assert paginator.count == 2 + + response._content = json.dumps({"links": []}).encode() + paginator.advance(response) + assert paginator.finished + assert paginator.count == 3 diff --git a/tests/core/test_streams.py b/tests/core/test_streams.py index 8732e1536..5bbd4b466 100644 --- a/tests/core/test_streams.py +++ b/tests/core/test_streams.py @@ -1,15 +1,18 @@ """Stream tests.""" +from __future__ import annotations + import logging -from typing import Any, Dict, Iterable, List, Optional, cast +from typing import Any, Iterable, cast import pendulum import pytest import requests from singer_sdk.helpers._classproperty import classproperty -from singer_sdk.helpers._singer import Catalog, CatalogEntry, MetadataMapping -from singer_sdk.helpers.jsonpath import _compile_jsonpath +from singer_sdk.helpers._singer import Catalog, MetadataMapping +from singer_sdk.helpers.jsonpath import _compile_jsonpath, extract_jsonpath +from singer_sdk.pagination import first from singer_sdk.streams.core import ( REPLICATION_FULL_TABLE, REPLICATION_INCREMENTAL, @@ -44,7 +47,7 @@ def __init__(self, tap: Tap): """Create a new stream.""" super().__init__(tap, schema=self.schema, name=self.name) - def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]: + def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: """Generate records.""" yield {"id": 1, "value": "Egypt"} yield {"id": 2, "value": "Germany"} @@ -83,6 +86,24 @@ class RestTestStream(RESTStream): ).to_dict() replication_key = "updatedAt" + def get_next_page_token( + self, + response: requests.Response, + previous_token: str | None, + ) -> str | None: + if self.next_page_token_jsonpath: + all_matches = extract_jsonpath( + self.next_page_token_jsonpath, + response.json(), + ) + try: + return first(all_matches) + except StopIteration: + return None + + else: + return response.headers.get("X-Next-Page", None) + class GraphqlTestStream(GraphQLStream): """Test Graphql stream class.""" @@ -103,7 +124,7 @@ class SimpleTestTap(Tap): name = "test-tap" settings_jsonschema = PropertiesList(Property("start_date", DateTimeType)).to_dict() - def discover_streams(self) -> List[Stream]: + def discover_streams(self) -> list[Stream]: """List all streams.""" return [ SimpleTestStream(self), @@ -309,7 +330,7 @@ def test_stream_starting_timestamp( ], ) def test_jsonpath_rest_stream( - tap: SimpleTestTap, path: str, content: str, result: List[dict] + tap: SimpleTestTap, path: str, content: str, result: list[dict] ): """Validate records are extracted correctly from the API response.""" fake_response = requests.Response() @@ -440,7 +461,10 @@ def test_next_page_token_jsonpath( RestTestStream.next_page_token_jsonpath = path stream = RestTestStream(tap) - next_page = stream.get_next_page_token(fake_response, previous_token=None) + with pytest.warns(DeprecationWarning): + paginator = stream.get_new_paginator() + + next_page = paginator.get_next(fake_response) assert next_page == result @@ -465,7 +489,7 @@ def test_sync_costs_calculation(tap: SimpleTestTap, caplog): def calculate_test_cost( request: requests.PreparedRequest, response: requests.Response, - context: Optional[Dict], + context: dict | None, ): return {"dim1": 1, "dim2": 2} From 76eed0d4e9c6a14563867f5190fb55cac6c6a2ab Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 1 Sep 2022 04:48:36 +0300 Subject: [PATCH 2/3] feat(taps): Add checks for primary keys, replication keys and state partitioning keys to standard tap tests (#829) * Add code validation tests to standard tests * Add extra test for replication keys * Apply suggestions from code review Co-authored-by: Edgar R. M. * Fix formatting for ci check Co-authored-by: Edgar R. M. --- singer_sdk/testing.py | 47 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/singer_sdk/testing.py b/singer_sdk/testing.py index d2fa94d91..b0c8a926d 100644 --- a/singer_sdk/testing.py +++ b/singer_sdk/testing.py @@ -40,7 +40,52 @@ def _test_stream_connections() -> None: tap1: Tap = tap_class(config=config, parse_env_config=True) tap1.run_connection_test() - return [_test_cli_prints, _test_discovery, _test_stream_connections] + def _test_pkeys_in_schema() -> None: + """Verify that primary keys are actually in the stream's schema.""" + tap = tap_class(config=config, parse_env_config=True) + for name, stream in tap.streams.items(): + pkeys = stream.primary_keys or [] + schema_props = set(stream.schema["properties"].keys()) + for pkey in pkeys: + error_message = ( + f"Coding error in stream '{name}': " + f"primary_key '{pkey}' is missing in schema" + ) + assert pkey in schema_props, error_message + + def _test_state_partitioning_keys_in_schema() -> None: + """Verify that state partitioning keys are actually in the stream's schema.""" + tap = tap_class(config=config, parse_env_config=True) + for name, stream in tap.streams.items(): + sp_keys = stream.state_partitioning_keys or [] + schema_props = set(stream.schema["properties"].keys()) + for sp_key in sp_keys: + assert sp_key in schema_props, ( + f"Coding error in stream '{name}': state_partitioning_key " + f"'{sp_key}' is missing in schema" + ) + + def _test_replication_keys_in_schema() -> None: + """Verify that the replication key is actually in the stream's schema.""" + tap = tap_class(config=config, parse_env_config=True) + for name, stream in tap.streams.items(): + rep_key = stream.replication_key + if rep_key is None: + continue + schema_props = set(stream.schema["properties"].keys()) + assert rep_key in schema_props, ( + f"Coding error in stream '{name}': replication_key " + f"'{rep_key}' is missing in schema" + ) + + return [ + _test_cli_prints, + _test_discovery, + _test_stream_connections, + _test_pkeys_in_schema, + _test_state_partitioning_keys_in_schema, + _test_replication_keys_in_schema, + ] def get_standard_target_tests( From 1430f7c609c6ec3cc7425a84a6f15e73145783b0 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 31 Aug 2022 22:28:54 -0500 Subject: [PATCH 3/3] docs: Add docs preview links to PR description (#942) --- .github/workflows/pr-preview-links.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .github/workflows/pr-preview-links.yml diff --git a/.github/workflows/pr-preview-links.yml b/.github/workflows/pr-preview-links.yml new file mode 100644 index 000000000..5a7e0d326 --- /dev/null +++ b/.github/workflows/pr-preview-links.yml @@ -0,0 +1,17 @@ +name: Read the Docs Pull Request Preview + +on: + pull_request_target: + types: + - opened + +permissions: + pull-requests: write + +jobs: + pr-preview-links: + runs-on: ubuntu-latest + steps: + - uses: readthedocs/actions/preview@v1 + with: + project-slug: "meltano-sdk"