-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SSH task exit code added to XCOM as 'ssh_exit' key #27370
Changes from 5 commits
6dbc2ef
2258629
f4a597c
59de9e7
6ab73a2
1ae2d81
bf6b6e3
752a918
6627c9b
ea5afa9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -151,16 +151,18 @@ def exec_ssh_client_command(self, ssh_client: SSHClient, command: str): | |||||||
ssh_client, command, timeout=self.timeout, environment=self.environment, get_pty=self.get_pty | ||||||||
) | ||||||||
|
||||||||
def raise_for_status(self, exit_status: int, stderr: bytes) -> None: | ||||||||
def raise_for_status(self, exit_status: int, stderr: bytes, **kwargs) -> None: | ||||||||
ti=kwargs["context"].get("task_instance") | ||||||||
ti.xcom_push(key="ssh_exit", value=exit_status) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should verify user wish to push to xcom. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @eladkal It is not optional on other operators: airflow/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py Lines 444 to 446 in b29ca4e
|
||||||||
if exit_status != 0: | ||||||||
raise AirflowException(f"SSH operator error: exit status = {exit_status}") | ||||||||
|
||||||||
def run_ssh_client_command(self, ssh_client: SSHClient, command: str) -> bytes: | ||||||||
def run_ssh_client_command(self, ssh_client: SSHClient, command: str, **kwargs) -> bytes: | ||||||||
assert self.ssh_hook | ||||||||
exit_status, agg_stdout, agg_stderr = self.ssh_hook.exec_ssh_client_command( | ||||||||
ssh_client, command, timeout=self.timeout, environment=self.environment, get_pty=self.get_pty | ||||||||
) | ||||||||
self.raise_for_status(exit_status, agg_stderr) | ||||||||
self.raise_for_status(exit_status, agg_stderr, context=kwargs["context"]) | ||||||||
return agg_stdout | ||||||||
|
||||||||
def execute(self, context=None) -> bytes | str: | ||||||||
|
@@ -172,7 +174,7 @@ def execute(self, context=None) -> bytes | str: | |||||||
self.get_pty = self.command.startswith("sudo") or self.get_pty | ||||||||
|
||||||||
with self.get_ssh_client() as ssh_client: | ||||||||
result = self.run_ssh_client_command(ssh_client, self.command) | ||||||||
result = self.run_ssh_client_command(ssh_client, self.command, context=context) | ||||||||
enable_pickling = conf.getboolean("core", "enable_xcom_pickling") | ||||||||
if not enable_pickling: | ||||||||
result = b64encode(result).decode("utf-8") | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.