Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP_KEEPALIVE option to http provider #24967

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/providers/http/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
Changelog
---------

Breaking changes
~~~~~~~~~~~~~~~~

The SimpleHTTPOperator, HttpSensor and HttpHook use now TCP_KEEPALIVE by default.
You can disable it by setting ``tcp_keep_alive`` to False and you can control keepalive parameters
by new ``tcp_keep_alive_*`` parameters added to constructor of the Hook, Operator and Sensor. Setting the
TCP_KEEPALIVE prevents some firewalls from closing a long-running connection that has long periods of
inactivity by sending empty TCP packets periodically. This has a very small impact on network traffic,
and potentially prevents the idle/hanging connections from being closed automatically by the firewalls.

3.0.0
.....

Expand Down
19 changes: 19 additions & 0 deletions airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import requests
import tenacity
from requests.auth import HTTPBasicAuth
from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
Expand All @@ -34,6 +35,11 @@ class HttpHook(BaseHook):
API url i.e https://www.google.com/ and optional authentication credentials. Default
headers can also be specified in the Extra field in json format.
:param auth_type: The auth type for the service
:param tcp_keep_alive: Enable TCP Keep Alive for the connection.
:param tcp_keep_alive_idle: The TCP Keep Alive Idle parameter (corresponds to ``socket.TCP_KEEPIDLE``).
:param tcp_keep_alive_count: The TCP Keep Alive count parameter (corresponds to ``socket.TCP_KEEPCNT``)
:param tcp_keep_alive_interval: The TCP Keep Alive interval parameter (corresponds to
``socket.TCP_KEEPINTVL``)
"""

conn_name_attr = 'http_conn_id'
Expand All @@ -46,13 +52,21 @@ def __init__(
method: str = 'POST',
http_conn_id: str = default_conn_name,
auth_type: Any = HTTPBasicAuth,
tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120,
tcp_keep_alive_count: int = 20,
tcp_keep_alive_interval: int = 30,
) -> None:
super().__init__()
self.http_conn_id = http_conn_id
self.method = method.upper()
self.base_url: str = ""
self._retry_obj: Callable[..., Any]
self.auth_type: Any = auth_type
self.tcp_keep_alive = tcp_keep_alive
self.keep_alive_idle = tcp_keep_alive_idle
self.keep_alive_count = tcp_keep_alive_count
self.keep_alive_interval = tcp_keep_alive_interval

# headers may be passed through directly or in the "extra" field in the connection
# definition
Expand Down Expand Up @@ -115,6 +129,11 @@ def run(

url = self.url_from_endpoint(endpoint)

if self.tcp_keep_alive:
keep_alive_adapter = TCPKeepAliveAdapter(
idle=self.keep_alive_idle, count=self.keep_alive_count, interval=self.keep_alive_interval
)
session.mount(url, keep_alive_adapter)
if self.method == 'GET':
# GET uses params
req = requests.Request(self.method, url, params=data, headers=headers, **request_kwargs)
Expand Down
23 changes: 22 additions & 1 deletion airflow/providers/http/operators/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class SimpleHttpOperator(BaseOperator):
'requests' documentation (options to modify timeout, ssl, etc.)
:param log_response: Log the response (default: False)
:param auth_type: The auth type for the service
:param tcp_keep_alive: Enable TCP Keep Alive for the connection.
:param tcp_keep_alive_idle: The TCP Keep Alive Idle parameter (corresponds to ``socket.TCP_KEEPIDLE``).
:param tcp_keep_alive_count: The TCP Keep Alive count parameter (corresponds to ``socket.TCP_KEEPCNT``)
:param tcp_keep_alive_interval: The TCP Keep Alive interval parameter (corresponds to
``socket.TCP_KEEPINTVL``)
"""

template_fields: Sequence[str] = (
Expand All @@ -78,6 +83,10 @@ def __init__(
http_conn_id: str = 'http_default',
log_response: bool = False,
auth_type: Type[AuthBase] = HTTPBasicAuth,
tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120,
tcp_keep_alive_count: int = 20,
tcp_keep_alive_interval: int = 30,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
Expand All @@ -91,11 +100,23 @@ def __init__(
self.extra_options = extra_options or {}
self.log_response = log_response
self.auth_type = auth_type
self.tcp_keep_alive = tcp_keep_alive
self.tcp_keep_alive_idle = tcp_keep_alive_idle
self.tcp_keep_alive_count = tcp_keep_alive_count
self.tcp_keep_alive_interval = tcp_keep_alive_interval

def execute(self, context: 'Context') -> Any:
from airflow.utils.operator_helpers import determine_kwargs

http = HttpHook(self.method, http_conn_id=self.http_conn_id, auth_type=self.auth_type)
http = HttpHook(
self.method,
http_conn_id=self.http_conn_id,
auth_type=self.auth_type,
tcp_keep_alive=self.tcp_keep_alive,
tcp_keep_alive_idle=self.tcp_keep_alive_idle,
tcp_keep_alive_count=self.tcp_keep_alive_count,
tcp_keep_alive_interval=self.tcp_keep_alive_interval,
)

self.log.info("Calling HTTP method")

Expand Down
1 change: 1 addition & 0 deletions airflow/providers/http/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies:
# The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to
# release it as a requirement for airflow
- requests>=2.26.0
- requests_toolbelt

integrations:
- integration-name: Hypertext Transfer Protocol (HTTP)
Expand Down
26 changes: 23 additions & 3 deletions airflow/providers/http/sensors/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def response_check(response, task_instance):
It should return True for 'pass' and False otherwise.
:param extra_options: Extra options for the 'requests' library, see the
'requests' documentation (options to modify timeout, ssl, etc.)
:param tcp_keep_alive: Enable TCP Keep Alive for the connection.
:param tcp_keep_alive_idle: The TCP Keep Alive Idle parameter (corresponds to ``socket.TCP_KEEPIDLE``).
:param tcp_keep_alive_count: The TCP Keep Alive count parameter (corresponds to ``socket.TCP_KEEPCNT``)
:param tcp_keep_alive_interval: The TCP Keep Alive interval parameter (corresponds to
``socket.TCP_KEEPINTVL``)
"""

template_fields: Sequence[str] = ('endpoint', 'request_params', 'headers')
Expand All @@ -81,6 +86,10 @@ def __init__(
headers: Optional[Dict[str, Any]] = None,
response_check: Optional[Callable[..., bool]] = None,
extra_options: Optional[Dict[str, Any]] = None,
tcp_keep_alive: bool = True,
tcp_keep_alive_idle: int = 120,
tcp_keep_alive_count: int = 20,
tcp_keep_alive_interval: int = 30,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
Expand All @@ -91,15 +100,26 @@ def __init__(
self.headers = headers or {}
self.extra_options = extra_options or {}
self.response_check = response_check

self.hook = HttpHook(method=method, http_conn_id=http_conn_id)
Copy link
Member

@pankajkoti pankajkoti Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk Is there a specific reason we would like to remove this assignment to the instance and are only creating the hook at a later stage in the poke method excluding it as an instance variable?
The async operator relies on this instance variable: https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/http/sensors/http.py#L67 and the RC is failing for the current async implementation.

Should we create a new hook instance in the derived class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error we get in our DAG run with the RC:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/http/sensors/http.py", line 67, in execute
    method=self.hook.method,  # TODO: Fix this to directly get method from ctor
AttributeError: 'HttpSensorAsync' object has no attribute 'hook'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a PR to reinstantiate the hook in the derived class: astronomer/astronomer-providers#515

Copy link
Member Author

@potiuk potiuk Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. I have not thought too much about this use case when I added it - it was just bringing a general approach that we usually do where Hooks are created in execute() method rather than in constructor.

But if there are good reasons why it should not be moved I can remove that one from the batch and make RC2 and restore the self.hook.

I think with async operators approach we miss a bit of best practices:

  • should we create Hook in init()?
  • should we have a Hook as Operator attribute (self.hook)?
  • or should we restore the hook every time when we come back from deferred state?

I think the last option is most "reasonable" because first two might make (an incorrect) assumption that the same hook object is available during the whole lifecycle of all operator's methods, and if Hook is not "stateless", it might break when operator is deferred (because the Hook will be recreated when operator resumes from deferred state). If you keep the hook as self. property, you might (unconciously) rely on the fact that the hook is the same object in various methods of deferred operator.

WDYT? Does it sound like a good practice ? Also others @dstandish maybe? I think we had some discussion about this in the past (can't recall exactly)

Copy link
Member

@pankajkoti pankajkoti Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. In that case, I think we can create a clean up story on our end to adhere to the best practices post our conclusion on this discussion. We do not need an RC2 for that then, and also for the sake that the operator still continues to work, I will for the time being, put the hack to create the hook in our operator init() with our PR.

Also, looping in @kaxil @bharanidharan14 @rajaths010494 @pankajastro for the discussion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. Would be great to contribute such "best practicess for writing Defferable Operators" to the community after all the experiences Astronomer had with writing theirs. It would be a great contribution :).

Later on we could event try to automate some of those "best practices" in pre-commits for any future community async operators. This sounds like a doable task to ast-parse an operatort and if it is a Defferrable one, make sure no self.hook is used.

self.tcp_keep_alive = tcp_keep_alive
self.tcp_keep_alive_idle = tcp_keep_alive_idle
self.tcp_keep_alive_count = tcp_keep_alive_count
self.tcp_keep_alive_interval = tcp_keep_alive_interval

def poke(self, context: 'Context') -> bool:
from airflow.utils.operator_helpers import determine_kwargs

hook = HttpHook(
method=self.method,
http_conn_id=self.http_conn_id,
tcp_keep_alive=self.tcp_keep_alive,
tcp_keep_alive_idle=self.tcp_keep_alive_idle,
tcp_keep_alive_count=self.tcp_keep_alive_count,
tcp_keep_alive_interval=self.tcp_keep_alive_interval,
)

self.log.info('Poking: %s', self.endpoint)
try:
response = self.hook.run(
response = hook.run(
self.endpoint,
data=self.request_params,
headers=self.headers,
Expand Down
3 changes: 2 additions & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@
},
"http": {
"deps": [
"requests>=2.26.0"
"requests>=2.26.0",
"requests_toolbelt"
],
"cross-providers-deps": []
},
Expand Down
38 changes: 38 additions & 0 deletions tests/providers/http/hooks/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import os
import unittest
from collections import OrderedDict
from http import HTTPStatus
from unittest import mock

import pytest
import requests
import requests_mock
import tenacity
from parameterized import parameterized
from requests.adapters import Response

from airflow.exceptions import AirflowException
from airflow.models import Connection
Expand Down Expand Up @@ -370,4 +372,40 @@ def test_connection_failure(self, m):
assert msg == '500:NOT_OK'


class TestKeepAlive:
def test_keep_alive_enabled(self):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
), mock.patch(
'requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter.send'
) as tcp_keep_alive_send, mock.patch(
'requests.adapters.HTTPAdapter.send'
) as http_send:
hook = HttpHook(method='GET')
response = Response()
response.status_code = HTTPStatus.OK
tcp_keep_alive_send.return_value = response
http_send.return_value = response
hook.run('v1/test')
tcp_keep_alive_send.assert_called()
http_send.assert_not_called()

def test_keep_alive_disabled(self):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
), mock.patch(
'requests_toolbelt.adapters.socket_options.TCPKeepAliveAdapter.send'
) as tcp_keep_alive_send, mock.patch(
'requests.adapters.HTTPAdapter.send'
) as http_send:
hook = HttpHook(method='GET', tcp_keep_alive=False)
response = Response()
response.status_code = HTTPStatus.OK
tcp_keep_alive_send.return_value = response
http_send.return_value = response
hook.run('v1/test')
tcp_keep_alive_send.assert_not_called()
http_send.assert_called()


send_email_test = mock.Mock()
9 changes: 6 additions & 3 deletions tests/providers/http/sensors/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,18 @@ def resp_check(_):
poke_interval=1,
)

with mock.patch.object(task.hook.log, 'error') as mock_errors:
with mock.patch('airflow.providers.http.hooks.http.HttpHook.log') as mock_log:
with pytest.raises(AirflowSensorTimeout):
task.execute(None)

assert mock_errors.called
assert mock_log.error.called
calls = [
mock.call('HTTP error: %s', 'Not Found'),
mock.call("This endpoint doesn't exist"),
mock.call('HTTP error: %s', 'Not Found'),
mock.call("This endpoint doesn't exist"),
]
mock_errors.assert_has_calls(calls)
mock_log.error.assert_has_calls(calls)


class FakeSession:
Expand All @@ -200,6 +200,9 @@ def prepare_request(self, request):
def merge_environment_settings(self, _url, **kwargs):
return kwargs

def mount(self, prefix, adapter):
pass


class TestHttpOpSensor(unittest.TestCase):
def setUp(self):
Expand Down