Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSH task exit code added to XCOM as 'ssh_exit' key #27370

Merged
merged 10 commits into from
Dec 4, 2022
Merged

SSH task exit code added to XCOM as 'ssh_exit' key #27370

merged 10 commits into from
Dec 4, 2022

Conversation

kolfild26
Copy link
Contributor

@kolfild26 kolfild26 commented Oct 29, 2022

Based on #23788

Motivation. Provide the ability to analyze the SSH tasks exit code in order to build a dag logic based on these codes. Now, only echo output goes to XCOM's return_value key. And in case of failure nothing being passed to return_value. So in a dag we don't know what specific code is returned.

What's done:
In SSHOperator, add XCOM key ssh_exit.
Task succeeded - ssh_exit set to 0. echo stuff still passed to return_value.
Task failed - ssh_exit stores the code which was retrived from the ssh session.

SSHOperator(
        command="pwd",
        dag=dag)

SSHOperator(
        command="pwd && exit 123",
        dag=dag)

изображение

P.S. I could imagine the alternative possible scenario - not to introduce a new XCOM key and just replace the return_value content, i.e. skip all other text output and just pass the ssh exit code there.
But my first approach does not reduce the functionality and the second one does. So I prefered the first.

Copy link
Contributor

@bdsoha bdsoha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to approve the PR a check was added to Static Checks requiring the use of double-quotes.

airflow/providers/ssh/operators/ssh.py Outdated Show resolved Hide resolved
airflow/providers/ssh/operators/ssh.py Outdated Show resolved Hide resolved
@kolfild26
Copy link
Contributor Author

@bdsoha thanks, applied

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test to cover this change

def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
def raise_for_status(self, exit_status: int, stderr: bytes, **kwargs) -> None:
ti=kwargs["context"].get("task_instance")
ti.xcom_push(key="ssh_exit", value=exit_status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should verify user wish to push to xcom.
users ask for this specifically with do_xcom_push parameter

Copy link
Contributor

@bdsoha bdsoha Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal It is not optional on other operators:

ti = context["ti"]
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It should have unit tests added.

@kolfild26
Copy link
Contributor Author

in progress. ut will be added.

@@ -134,16 +134,18 @@ def exec_ssh_client_command(self, ssh_client: SSHClient, command: str):
ssh_client, command, timeout=self.cmd_timeout, environment=self.environment, get_pty=self.get_pty
)

def raise_for_status(self, exit_status: int, stderr: bytes) -> None:
def raise_for_status(self, exit_status: int, stderr: bytes, **kwargs) -> None:
ti=kwargs["context"].get("task_instance")
Copy link
Contributor

@bdsoha bdsoha Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ti=kwargs["context"].get("task_instance")
ti = kwargs["context"].get("task_instance")

@kolfild26
Copy link
Contributor Author

Hi,
looks like, it's done now. Please review changes.

  1. Fixed errors
    pre-commit run run-mypy and breeze testing tests --test-type "Providers[ssh]" were successful.
  2. Description added
  3. test_push_ssh_exit_to_xcom for the newly introduced functionality added

@kolfild26
Copy link
Contributor Author

Also some notes about the test which I added.
There were no any tests which uses conext inside themselves.
I, in turn, needed context to pull/push to/from xcom. So, I searched all from the tests and found that test_kubernetes_pod.py has create_context method. It turned out it fully fitts our need. So I copied it to test_ssh.py and used in test_push_ssh_exit_to_xcom.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really cool now :). Very much needed. Let's see if our CI agrees.

@kolfild26
Copy link
Contributor Author

@potiuk Static checks error in the #59009 run relates to sensors/base.py which was not modified in my commits.
Probably that's because last time I pulled from main 7 days ago and #27871 was pushed 3 days ago i.e you ran the version which doesn't contain these changes.
If it's needed I update my PR branch just now.

@potiuk
Copy link
Member

potiuk commented Nov 26, 2022

Yep. Rebasing is alwasy a good idea.

@kolfild26
Copy link
Contributor Author

@potiuk potiuk merged commit 2b107e6 into apache:main Dec 4, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 4, 2022

Awesome work, congrats on your first merged pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants