Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Airflow 16364] Add conn_timeout and cmd_timeout params to SSHOperator; add conn_timeout param to SSHHook #17236

Merged
merged 70 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
a0a9720
Issue 16363 - add conn_timeout and cmd_timeout params to SSHOperator;…
fatmumuhomer Jul 25, 2021
d3f0052
Update SSHOperator tests for new conn_timeout and cmd_timeout params
fatmumuhomer Jul 26, 2021
6a49325
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 26, 2021
9784e56
Issue 16363 - add conn_timeout and cmd_timeout params to SSHOperator;…
fatmumuhomer Jul 25, 2021
dc2d15e
Update SSHOperator tests for new conn_timeout and cmd_timeout params
fatmumuhomer Jul 26, 2021
450046b
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 26, 2021
02abeef
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Jul 26, 2021
e2b67e6
Removed commented TODO for SSHHook changes
fatmumuhomer Jul 26, 2021
250156f
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 26, 2021
b72d9be
Fix static checks for operators/test_ssh.py and hooks/test_ssh.py
fatmumuhomer Jul 26, 2021
3741901
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Jul 26, 2021
212ad15
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 26, 2021
7f5e66d
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 27, 2021
f59e12b
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 28, 2021
af3e425
Issue 16363 - add conn_timeout and cmd_timeout params to SSHOperator;…
fatmumuhomer Jul 25, 2021
9904115
Update SSHOperator tests for new conn_timeout and cmd_timeout params
fatmumuhomer Jul 26, 2021
5231613
Removed commented TODO for SSHHook changes
fatmumuhomer Jul 26, 2021
f339582
Fix static checks for operators/test_ssh.py and hooks/test_ssh.py
fatmumuhomer Jul 26, 2021
4d2d729
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Jul 28, 2021
75742a6
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 29, 2021
98dc572
Set deprecated timeout param default value to None; add more tests
fatmumuhomer Jul 29, 2021
2752c60
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Jul 29, 2021
9553020
Remove unnecessary extra_options.get() calls for timeout and conn_tim…
fatmumuhomer Jul 29, 2021
70ff345
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 29, 2021
c5fbae6
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Jul 30, 2021
40e7f43
Add reference to new timeout parameters in docstrings for deprecated …
fatmumuhomer Aug 1, 2021
86c731a
Move timeout docstring text to 2nd line to keep line within 110 chara…
fatmumuhomer Aug 2, 2021
62ef8c5
Favor conn_timeout parameter over extra options
fatmumuhomer Aug 2, 2021
22b20db
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 3, 2021
23752a4
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 4, 2021
0434451
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 5, 2021
87bbbaa
Add conn_timeout to SSH Connection docs and mark timeout as deprecated
fatmumuhomer Aug 5, 2021
c983284
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 5, 2021
31990d0
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 6, 2021
af57202
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 15, 2021
9a1f262
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 17, 2021
9debf0f
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 21, 2021
6a93fa7
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Aug 24, 2021
df6aef8
Fix issue with defaults for timeouts in SSHHook/SSHOperator; add para…
fatmumuhomer Aug 24, 2021
d92bf2c
Fix spelling mistake in ssh.rst
fatmumuhomer Aug 24, 2021
0b8cdf6
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Sep 1, 2021
a5ad8c7
Move details for conn_timeout to SSHHook docstring instead of ssh.rst
fatmumuhomer Sep 1, 2021
051710f
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Sep 1, 2021
f992016
Merge branch 'apache:main' into airflow-16364
fatmumuhomer Sep 3, 2021
55cbfbc
Remove commented out line
fatmumuhomer Sep 3, 2021
56dbb56
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Sep 3, 2021
60e74b1
Issue 16363 - add conn_timeout and cmd_timeout params to SSHOperator;…
fatmumuhomer Jul 25, 2021
b3f73e1
Update SSHOperator tests for new conn_timeout and cmd_timeout params
fatmumuhomer Jul 26, 2021
0c1c1ef
Removed commented TODO for SSHHook changes
fatmumuhomer Jul 26, 2021
e51910f
Fix static checks for operators/test_ssh.py and hooks/test_ssh.py
fatmumuhomer Jul 26, 2021
05b62d9
Set deprecated timeout param default value to None; add more tests
fatmumuhomer Jul 29, 2021
8347232
Remove unnecessary extra_options.get() calls for timeout and conn_tim…
fatmumuhomer Jul 29, 2021
cb5c301
Add reference to new timeout parameters in docstrings for deprecated …
fatmumuhomer Aug 1, 2021
df5a5cd
Move timeout docstring text to 2nd line to keep line within 110 chara…
fatmumuhomer Aug 2, 2021
b7d0115
Favor conn_timeout parameter over extra options
fatmumuhomer Aug 2, 2021
5c07819
Add conn_timeout to SSH Connection docs and mark timeout as deprecated
fatmumuhomer Aug 5, 2021
d287e4d
Fix issue with defaults for timeouts in SSHHook/SSHOperator; add para…
fatmumuhomer Aug 24, 2021
8399a81
Fix spelling mistake in ssh.rst
fatmumuhomer Aug 24, 2021
fa80b3a
Move details for conn_timeout to SSHHook docstring instead of ssh.rst
fatmumuhomer Sep 1, 2021
a68185d
Chart: Make cleanup cronjob cmd/args configurable (#17970)
jedcunningham Sep 1, 2021
e177208
Delete unnecessary parameters in EKSPodOperator (#17960)
mik-laj Sep 2, 2021
a5f7249
Remove default_args pattern + added get_current_context() use for Cor…
josh-fell Sep 2, 2021
e9513b2
Fix constraint generation properly (#17964)
potiuk Sep 2, 2021
1e8ce4f
Fix max_active_runs not allowing moving of queued dagruns to running …
ephraimbuddy Sep 2, 2021
ff925ae
Add DAG run endpoint for marking a dagrun success or failed(#17839)
bbenshalom Sep 2, 2021
6822833
Fix blank dag dependencies view (#17990)
bbovenzi Sep 2, 2021
81cfaa1
Serialize the template_ext attribute to show it in UI (#17985)
BasPH Sep 2, 2021
40f3cfe
Remove commented out line
fatmumuhomer Sep 3, 2021
1872901
Merge branch 'airflow-16364' of https://github.com/fatmumuhomer/airfl…
fatmumuhomer Sep 9, 2021
99bcf65
Merge branch 'main' of https://github.com/apache/airflow into airflow…
fatmumuhomer Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions airflow/providers/ssh/hooks/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
except ImportError:
from getpass import getuser

TIMEOUT_DEFAULT = 10


class SSHHook(BaseHook):
"""
Expand All @@ -56,7 +58,12 @@ class SSHHook(BaseHook):
:type key_file: str
:param port: port of remote host to connect (Default is paramiko SSH_PORT)
:type port: int
:param timeout: timeout for the attempt to connect to the remote_host.
:param conn_timeout: timeout (in seconds) for the attempt to connect to the remote_host.
The default is 10 seconds. If provided, it will replace the `conn_timeout` which was
predefined in the connection of `ssh_conn_id`.
:type conn_timeout: int
:param timeout: (Deprecated). timeout for the attempt to connect to the remote_host.
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
Use conn_timeout instead.
:type timeout: int
:param keepalive_interval: send a keepalive packet to remote host every
keepalive_interval seconds
Expand Down Expand Up @@ -101,7 +108,8 @@ def __init__(
password: Optional[str] = None,
key_file: Optional[str] = None,
port: Optional[int] = None,
timeout: int = 10,
timeout: Optional[int] = None,
conn_timeout: Optional[int] = None,
keepalive_interval: int = 30,
) -> None:
super().__init__()
Expand All @@ -113,6 +121,7 @@ def __init__(
self.pkey = None
self.port = port
self.timeout = timeout
self.conn_timeout = conn_timeout
self.keepalive_interval = keepalive_interval

# Default values, overridable from Connection
Expand All @@ -137,6 +146,7 @@ def __init__(
self.remote_host = conn.host
if self.port is None:
self.port = conn.port

if conn.extra is not None:
extra_options = conn.extra_dejson
if "key_file" in extra_options and self.key_file is None:
Expand All @@ -148,7 +158,17 @@ def __init__(
self.pkey = self._pkey_from_private_key(private_key, passphrase=private_key_passphrase)

if "timeout" in extra_options:
self.timeout = int(extra_options["timeout"], 10)
warnings.warn(
'Extra option `timeout` is deprecated.'
'Please use `conn_timeout` instead.'
'The old option `timeout` will be removed in a future version.',
DeprecationWarning,
stacklevel=2,
)
self.timeout = int(extra_options['timeout'])

if "conn_timeout" in extra_options and self.conn_timeout is None:
self.conn_timeout = int(extra_options['conn_timeout'])

if "compress" in extra_options and str(extra_options["compress"]).lower() == 'false':
self.compress = False
Expand Down Expand Up @@ -185,6 +205,18 @@ def __init__(
self.host_key = key_constructor(data=decoded_host_key)
self.no_host_key_check = False

if self.timeout:
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
warnings.warn(
'Parameter `timeout` is deprecated.'
'Please use `conn_timeout` instead.'
'The old option `timeout` will be removed in a future version.',
DeprecationWarning,
stacklevel=1,
)

if self.conn_timeout is None:
self.conn_timeout = self.timeout if self.timeout else TIMEOUT_DEFAULT

if self.pkey and self.key_file:
raise AirflowException(
"Params key_file and private_key both provided. Must provide no more than one."
Expand Down Expand Up @@ -253,7 +285,7 @@ def get_conn(self) -> paramiko.SSHClient:
connect_kwargs = dict(
hostname=self.remote_host,
username=self.username,
timeout=self.timeout,
timeout=self.conn_timeout,
compress=self.compress,
port=self.port,
sock=self.host_proxy,
Expand Down
37 changes: 32 additions & 5 deletions airflow/providers/ssh/operators/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.

import warnings
from base64 import b64encode
from select import select
from typing import Optional, Union
Expand All @@ -25,6 +26,8 @@
from airflow.models import BaseOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

CMD_TIMEOUT = 10


class SSHOperator(BaseOperator):
"""
Expand All @@ -43,7 +46,14 @@ class SSHOperator(BaseOperator):
:type remote_host: str
:param command: command to execute on remote host. (templated)
:type command: str
:param timeout: timeout (in seconds) for executing the command. The default is 10 seconds.
:param conn_timeout: timeout (in seconds) for maintaining the connection. The default is 10 seconds.
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
Nullable. If provided, it will replace the `conn_timeout` which was
predefined in the connection of `ssh_conn_id`.
:type conn_timeout: int
:param cmd_timeout: timeout (in seconds) for executing the command. The default is 10 seconds.
:type cmd_timeout: int
:param timeout: (deprecated) timeout (in seconds) for executing the command. The default is 10 seconds.
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
Use conn_timeout and cmd_timeout parameters instead.
:type timeout: int
:param environment: a dict of shell environment variables. Note that the
server will reject them silently if `AcceptEnv` is not set in SSH config.
Expand All @@ -66,7 +76,9 @@ def __init__(
ssh_conn_id: Optional[str] = None,
remote_host: Optional[str] = None,
command: Optional[str] = None,
timeout: int = 10,
timeout: Optional[int] = None,
conn_timeout: Optional[int] = None,
cmd_timeout: Optional[int] = None,
environment: Optional[dict] = None,
get_pty: bool = False,
**kwargs,
Expand All @@ -77,9 +89,24 @@ def __init__(
self.remote_host = remote_host
self.command = command
self.timeout = timeout
self.conn_timeout = conn_timeout
self.cmd_timeout = cmd_timeout
if self.conn_timeout is None and self.timeout:
self.conn_timeout = self.timeout
if self.cmd_timeout is None:
self.cmd_timeout = self.timeout if self.timeout else CMD_TIMEOUT
self.environment = environment
self.get_pty = (self.command.startswith('sudo') or get_pty) if self.command else get_pty

if self.timeout:
warnings.warn(
'Parameter `timeout` is deprecated.'
'Please use `conn_timeout` and `cmd_timeout` instead.'
'The old option `timeout` will be removed in a future version.',
DeprecationWarning,
stacklevel=1,
)

def execute(self, context) -> Union[bytes, str, bool]:
try:
if self.ssh_conn_id:
Expand All @@ -89,7 +116,7 @@ def execute(self, context) -> Union[bytes, str, bool]:
self.log.info(
"ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook."
)
self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, timeout=self.timeout)
self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id, conn_timeout=self.conn_timeout)

if not self.ssh_hook:
raise AirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
Expand All @@ -112,7 +139,7 @@ def execute(self, context) -> Union[bytes, str, bool]:
stdin, stdout, stderr = ssh_client.exec_command(
command=self.command,
get_pty=self.get_pty,
timeout=self.timeout,
timeout=self.cmd_timeout,
environment=self.environment,
)
# get channels
Expand All @@ -133,7 +160,7 @@ def execute(self, context) -> Union[bytes, str, bool]:

# read from both stdout and stderr
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
readq, _, _ = select([channel], [], [], self.timeout)
readq, _, _ = select([channel], [], [], self.cmd_timeout)
for recv in readq:
if recv.recv_ready():
line = stdout.channel.recv(len(recv.in_buffer))
Expand Down
7 changes: 4 additions & 3 deletions docs/apache-airflow-providers-ssh/connections/ssh.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Extra (optional)
* ``key_file`` - Full Path of the private SSH Key file that will be used to connect to the remote_host.
* ``private_key`` - Content of the private key used to connect to the remote_host.
* ``private_key_passphrase`` - Content of the private key passphrase used to decrypt the private key.
* ``timeout`` - An optional timeout (in seconds) for the TCP connect. Default is ``10``.
* ``conn_timeout`` - An optional timeout (in seconds) for the TCP connect. Default is ``10``.
* ``timeout`` - Deprecated - use conn_timeout instead.
* ``compress`` - ``true`` to ask the remote client/server to compress traffic; ``false`` to refuse compression. Default is ``true``.
* ``no_host_key_check`` - Set to ``false`` to restrict connecting to hosts with no entries in ``~/.ssh/known_hosts`` (Hosts file). This provides maximum protection against trojan horse attacks, but can be troublesome when the ``/etc/ssh/ssh_known_hosts`` file is poorly maintained or connections to new hosts are frequently made. This option forces the user to manually add all new hosts. Default is ``true``, ssh will automatically add new host keys to the user known hosts files.
* ``allow_host_key_change`` - Set to ``true`` if you want to allow connecting to hosts that has host key changed or when you get 'REMOTE HOST IDENTIFICATION HAS CHANGED' error. This wont protect against Man-In-The-Middle attacks. Other possible solution is to remove the host entry from ``~/.ssh/known_hosts`` file. Default is ``false``.
Expand All @@ -60,7 +61,7 @@ Extra (optional)

{
"key_file": "/home/airflow/.ssh/id_rsa",
"timeout": "10",
"conn_timeout": "10",
"compress": "false",
"look_for_keys": "false",
"allow_host_key_change": "false",
Expand All @@ -75,7 +76,7 @@ Extra (optional)

.. code-block:: bash

export AIRFLOW_CONN_MAIN_SERVER='ssh://user:pass@localhost:22?timeout=10&compress=false&no_host_key_check=false&allow_host_key_change=true&key_file=%2Fhome%2Fairflow%2F.ssh%2Fid_rsa'
export AIRFLOW_CONN_MAIN_SERVER='ssh://user:pass@localhost:22?conn_timeout=10&compress=false&no_host_key_check=false&allow_host_key_change=true&key_file=%2Fhome%2Fairflow%2F.ssh%2Fid_rsa'

Example connection string with ``private_key`` (actual private key provided in connection):

Expand Down
Loading