Skip to content

Commit

Permalink
🎉 Python CDK: Allow setting network adapter args on outgoing HTTP req…
Browse files Browse the repository at this point in the history
…uests (#4493)
  • Loading branch information
sherifnada authored and gl-pix committed Jul 22, 2021
1 parent 7ed6cb2 commit f02d74a
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-cdk-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Build CDK Package
run: ./gradlew --no-daemon :airbyte-cdk:python:build
run: ./gradlew --no-daemon --no-build-cache :airbyte-cdk:python:build
- name: Add Failure Comment
if: github.event.inputs.comment-id && !success()
uses: peter-evans/create-or-update-comment@v1
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.5
Allow specifying keyword arguments to be sent on a request made by an HTTP stream: https://github.com/airbytehq/airbyte/pull/4493

## 0.1.4
Allow to use Python 3.7.0: https://github.com/airbytehq/airbyte/pull/3566

Expand Down
24 changes: 18 additions & 6 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ def request_body_json(
"""
return None

def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
Override to return a mapping of keyword arguments to be used when creating the HTTP request.
Any option listed in https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send for can be returned from
this method. Note that these options do not conflict with request-level options such as headers, request params, etc..
"""
return {}

@abstractmethod
def parse_response(
self,
Expand Down Expand Up @@ -166,13 +179,13 @@ def _create_prepared_request(
# TODO support non-json bodies
args["json"] = json

return requests.Request(**args).prepare()
return self._session.prepare_request(requests.Request(**args))

# TODO allow configuring these parameters. If we can get this into the requests library, then we can do it without the ugly exception hacks
# see https://github.com/litl/backoff/pull/122
@default_backoff_handler(max_tries=5, factor=5)
@user_defined_backoff_handler(max_tries=5)
def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
"""
Wraps sending the request in rate limit and error handlers.
Expand All @@ -190,9 +203,8 @@ def _send_request(self, request: requests.PreparedRequest) -> requests.Response:
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request)
response: requests.Response = self._session.send(request, **request_kwargs)
if self.should_retry(response):

custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
raise UserDefinedBackoffException(backoff=custom_backoff_time, request=request, response=response)
Expand Down Expand Up @@ -224,8 +236,8 @@ def read_records(
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)

response = self._send_request(request)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ errors. It is not currently possible to specify a rate limit Airbyte should adhe
### Stream Slicing

When implementing [stream slicing](incremental-stream.md#streamstream_slices) in an `HTTPStream` each Slice is equivalent to a HTTP request; the stream will make one request per element returned by the `stream_slices` function. The current slice being read is passed into every other method in `HttpStream` e.g: `request_params`, `request_headers`, `path`, etc.. to be injected into a request. This allows you to dynamically determine the output of the `request_params`, `path`, and other functions to read the input slice and return the appropriate value.

### Network Adapter Keyword arguments
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.
11 changes: 2 additions & 9 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.4",
version="0.1.5",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -73,14 +73,7 @@
"requests",
],
python_requires=">=3.7.0",
extras_require={
"dev": [
"MyPy==0.812",
"pytest",
"pytest-cov",
"pytest-mock",
]
},
extras_require={"dev": ["MyPy==0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"]},
entry_points={
"console_scripts": ["base-python=base_python.entrypoint:main"],
},
Expand Down
13 changes: 13 additions & 0 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@


from typing import Any, Iterable, Mapping, Optional
from unittest.mock import ANY

import pytest
import requests
Expand Down Expand Up @@ -60,6 +61,18 @@ def parse_response(
yield stubResp


def test_request_kwargs_used(mocker, requests_mock):
stream = StubBasicReadHttpStream()
request_kwargs = {"cert": None, "proxies": "google.com"}
mocker.patch.object(stream, "request_kwargs", return_value=request_kwargs)
mocker.patch.object(stream._session, "send", wraps=stream._session.send)
requests_mock.register_uri("GET", stream.url_base)

list(stream.read_records(sync_mode=SyncMode.full_refresh))

stream._session.send.assert_any_call(ANY, **request_kwargs)


def test_stub_basic_read_http_stream_read_records(mocker):
stream = StubBasicReadHttpStream()
blank_response = {} # Send a blank response is fine as we ignore the response in `parse_response anyway.
Expand Down

0 comments on commit f02d74a

Please sign in to comment.