Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SimpleAPIExtractor #58

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/docs/reference/extractors.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,64 @@ If the file is larger than the specified amount, it will be downloaded to a temp
If the file is smaller than the specified amount, it will be downloaded to memory and read from there.
The default value is 5 MB.

## `SimpleApiExtractor`

The `SimpleApiExtractor` class represents an extractor that reads records from a simple API. It takes a single URL as
input and yields the records read from the API. The API must return a JSON array of objects either directly or as the
value of specified key.

For example, if the API returns the following JSON:

```json
{
"people": [
{
"name": "John Doe",
"age": 42
},
{
"name": "Jane Doe",
"age": 42
}
]
}
```

Then the extractor can be configured as follows:

```yaml
- implementation: nodestream.pipeline.extractors:SimpleApiExtractor
arguments:
url: https://example.com/people
yield_from: people
```

If the API returns a JSON array directly, then the `yield_from` argument can be omitted.

The `SimpleApiExtractor` will automatically paginate through the API until it reaches the end if the API supports
limit-offset style pagination through a query parameter.
By default, no pagination is performed. The query parameter name can be configured using the `offset_query_param` argument.

For example, if the API supports pagination through a `page` query parameter, then the extractor can be configured as follows:

```yaml
- implementation: nodestream.pipeline.extractors:SimpleApiExtractor
arguments:
url: https://example.com/people
offset_query_param: page
```

You can also specify headers to be sent with the request using the `headers` argument.

```yaml
- implementation: nodestream.pipeline.extractors:SimpleApiExtractor
arguments:
url: https://example.com/people
headers:
x-api-key: !env MY_API_KEY
```


## `TimeToLiveConfigurationExtractor`

"Extracts" time to live configurations from the file and yields them one at a time to the graph database writer.
Expand Down
43 changes: 43 additions & 0 deletions nodestream/pipeline/extractors/apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Dict, Optional

from httpx import AsyncClient

from .extractor import Extractor


class SimpleApiExtractor(Extractor):
def __init__(
self,
url: str,
yield_from: Optional[str] = None,
offset_query_param: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
):
self.url = url
self.offset_query_param = offset_query_param
self.headers = headers
self.yield_from = yield_from

@property
def does_pagination(self) -> bool:
return self.offset_query_param is not None

async def extract_records(self):
should_continue = True
records_so_far = 0
params = None

async with AsyncClient() as client:
while should_continue:
result = await client.get(self.url, headers=self.headers, params=params)
result.raise_for_status()
records = result.json()
if self.yield_from:
records = records[self.yield_from]

for item in records:
yield item

should_continue = len(records) > 0 and self.does_pagination
records_so_far += len(records)
params = {self.offset_query_param: records_so_far}
46 changes: 46 additions & 0 deletions tests/unit/pipeline/extractors/test_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
from hamcrest import assert_that, equal_to

from nodestream.pipeline.extractors.apis import SimpleApiExtractor

TODO_API = "https://jsonplaceholder.typicode.com/todos"
TODOS_RESPONSE = [
{"id": 1, "title": "delectus aut autem", "completed": False},
{"id": 2, "title": "quis ut nam facilis et officia qui", "completed": True},
]


@pytest.mark.asyncio
async def test_api_no_pagination_no_nesting(httpx_mock):
httpx_mock.add_response(url=TODO_API, json=TODOS_RESPONSE)
subject = SimpleApiExtractor(TODO_API)
results = [r async for r in subject.extract_records()]
assert_that(results, equal_to(TODOS_RESPONSE))


@pytest.mark.asyncio
async def test_api_no_pagination_with_nesting(httpx_mock):
httpx_mock.add_response(url=TODO_API, json={"todos": TODOS_RESPONSE})
subject = SimpleApiExtractor(TODO_API, yield_from="todos")
results = [r async for r in subject.extract_records()]
assert_that(results, equal_to(TODOS_RESPONSE))


@pytest.mark.asyncio
async def test_api_pagination_no_nesting(httpx_mock):
httpx_mock.add_response(url=TODO_API, json=TODOS_RESPONSE[:1])
httpx_mock.add_response(url=TODO_API + "?o=1", json=TODOS_RESPONSE[1:])
httpx_mock.add_response(url=TODO_API + "?o=2", json=[])
subject = SimpleApiExtractor(TODO_API, offset_query_param="o")
results = [r async for r in subject.extract_records()]
assert_that(results, equal_to(TODOS_RESPONSE))


@pytest.mark.asyncio
async def test_api_pagination_with_nesting(httpx_mock):
httpx_mock.add_response(url=TODO_API, json={"todos": TODOS_RESPONSE[:1]})
httpx_mock.add_response(url=TODO_API + "?o=1", json={"todos": TODOS_RESPONSE[1:]})
httpx_mock.add_response(url=TODO_API + "?o=2", json={"todos": []})
subject = SimpleApiExtractor(TODO_API, offset_query_param="o", yield_from="todos")
results = [r async for r in subject.extract_records()]
assert_that(results, equal_to(TODOS_RESPONSE))