Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) #23658

Closed
wants to merge 25 commits into from

Conversation

schattian
Copy link
Contributor

@schattian schattian commented May 11, 2022

related: #23618
fixes leaking loop in 67474cc. That also involved changing get_event_loop to be new_event_loop so it is closable (otherwise, parallel usage of the function could try to reuse a closed loop).

Finally, it fixes previously broken (modified) test. Notice that one call to container_is_running was leaked to one test that only made 3 itself.

Unfortunately, cant run all ci tests setup in my machine (oom)

schattian and others added 8 commits May 10, 2022 15:01
* 'main' of github.com:apache/airflow:
  Revert "Fix k8s pod.execute randomly stuck indefinitely by logs consumption (apache#23497) (apache#23618)" (apache#23656)
  Rename cluster_policy to task_policy (apache#23468)
  [FEATURE] google provider - BigQueryInsertJobOperator log query (apache#23648)
  Fix k8s pod.execute randomly stuck indefinitely by logs consumption (apache#23497) (apache#23618)
  Fixed test and remove pytest.mark.xfail for test_exc_tb (apache#23650)
  Added kubernetes version (1.24) in README.md(for Main version(dev)), … (apache#23649)
  Add `RedshiftDeleteClusterOperator` support (apache#23563)
  Added postgres 14 to support versions(including breeze) (apache#23506)
  Don't run pre-migration checks for downgrade (apache#23634)
  Add index for event column in log table (apache#23625)
  Simplify flash message for _airflow_moved tables (apache#23635)
  Fix assuming "Feature" answer on CI when generating docs (apache#23640)
  Fix typo issue (apache#23633)
  [FEATURE] add K8S 1.24 support (apache#23637)
  [FEATURE] update K8S-KIND to 0.13.0 (apache#23636)
  Prevent KubernetesJobWatcher getting stuck on resource too old (apache#23521)
  Make provider doc preparation a bit more fun :) (apache#23629)
@schattian schattian requested a review from jedcunningham as a code owner May 11, 2022 22:28
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes provider related issues area:providers labels May 11, 2022
@schattian schattian changed the title 23497 tasks stuck Fix k8s pod.execute randomly stuck indefinitely by logs consumption (#23497) May 11, 2022
@kaxil kaxil requested a review from dstandish May 12, 2022 12:25
@schattian
Copy link
Contributor Author

schattian commented May 12, 2022

Now added explicit shutdown of the executor. However.. it's strange, loop.close should be shutting down the default executor: https://github.dev/python/cpython/blob/f882d33778ee2625ab32d90e28edb6878fb8af93/Lib/asyncio/base_events.py#L678-L683
If that doesn't work it is probably a bug somewhere else or a typo im not seeing...

@schattian
Copy link
Contributor Author

schattian commented May 12, 2022

Looking at this again - clearly an oversight from my side. The thread continues running the sync function (logs_stream, blocking the executor). Cancel is only useful while task is pending.
I will add a walkaround to this later today.

@potiuk
Copy link
Member

potiuk commented May 12, 2022

Yeah. Using Asyncio is kinda tricky for standalone operator. It sounds a bit tricky with this approach. Seems that we are trying to start asyncio.loop to run asyncio operations and then we kill the thread after closing the operator. This sounds rather complex as we do not really achieve the "async" promises.

Just a thought - maybe we should somehew plug it in the "defferable" operators and triggerer ? I am not sure if that would be efficient for getting the logs using triggerer and triggers ? Or maybe we should abandon the idea of async operations here as they do not really give us anything and only cause troublesome testing and difficult debugging in the future?

@andrewgodwin - maybe you can share your thoughts on that ?

@schattian
Copy link
Contributor Author

schattian commented May 12, 2022

@potiuk I had one thing in mind to try to reduce the complexity of this, releasing the connection (response.release_conn). Based on https://github.dev/urllib3/urllib3/blob/37a67739ba9a7835886555744e9dabe3009e538a/src/urllib3/response.py#L708-L712 seems could work.

Much harder to test though as rn test is just a simple generator.

@schattian
Copy link
Contributor Author

@potiuk i replaced the asyncio-based approach with an executor to use multiprocessing.Process instead.
I added one more test case also for the case api hangs + no logs.
Imho this is the best to do to not overcomplicate this, wdyt?

@dstandish
Copy link
Contributor

@schattian any reason you are using multiprocessing instead of a thread?

@schattian
Copy link
Contributor Author

schattian commented Jun 7, 2022

@schattian any reason you are using multiprocessing instead of a thread?

@dstandish
Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

@potiuk
Copy link
Member

potiuk commented Jun 7, 2022

@schattian any reason you are using multiprocessing instead of a thread?

@dstandish Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

OK. I will wait for merging before releasing the new wave of providers then - I think it might need some more deliberation (We will include it in the next wave then).

@dstandish
Copy link
Contributor

Yep. The thread could hang indefinitely, as the kube logs api stream could do, so I need to terminate the process.

However, i am not very knowledgeable on python's api for this kind of thing, so open to suggestions.

ok yeah, thing i learned: killing threads is not well-supported. interestingly, something similar is done for ECS, and threads are used there, but they don't deal with indefinite hanging problem (and therefore don't have to forcibly terminate it).

one alternative i'll just mention is we could handle this by doing our own "following" instead of relying on the stream. i.e. we do our own loop and consume the latest logs, then sleep a second and do the same. this might allow for a cleaner solution.
but, multiprocessing seems ok. i'll just mention a few things.

one is I think you need to kill the logging process when on_kill is called.

the other is, if you get an error in await_container_completion, your subprocess won't be killed. so, you probably should modify so that if you get an unexpected error after process has started, that the process will be killed.

also, i would try to make note somewhere that this is due to a problem with kubernetes, and what we are doing here is a hack that should be removed once the issue has been resolved upstream (and propagated sufficiently, which of course could be a long time from now). the upstream issue is documented here kubernetes/kubernetes#59902.

@dstandish
Copy link
Contributor

also, i'd make "private" the two methods you've added

@dstandish
Copy link
Contributor

separate question for you @schattian. suppose the stream hangs, and the pod is still running. if we terminate the process and restart logs consumption, will it get logs from the new location? or will it still look at the original location and therefore any new logging is unreachable and it will immediately hang?

@schattian
Copy link
Contributor Author

schattian commented Jun 16, 2022

one alternative i'll just mention is we could handle this by doing our own "following" instead of relying on the stream. i.e. we do our own loop and consume the latest logs, then sleep a second and do the same.

you mean calling X amount of times the kubectl logs api? I think that's a worse than a follow..
Mostly because you'll have some lines printed twice (and could be expensive for some dags)

one is I think you need to kill the logging process when on_kill is called.

Sorry, i dont think i understand what's the point here

the other is, if you get an error in await_container_completion, your subprocess won't be killed. so, you probably should modify so that if you get an unexpected error after process has started, that the process will be killed.

Hmm but in that case, aren't we going to propagate a SIGTERM? (ie killing the subprocess implicitly)?
I was just relying on that, but if that's not how it works / could change, i will modify that.

also, i'd make "private" the two methods you've added

👍🏻

separate question for you @schattian. suppose the stream hangs, and the pod is still running. if we terminate the process and restart logs consumption, will it get logs from the new location? or will it still look at the original location and therefore any new logging is unreachable and it will immediately hang?

That is correct. A call to kubectl logs api after the stream hanged will return the newest logs.
But, there are a few problems with that:

  1. The pod could be deleted, and in that case you'll receive no logs.
  2. We determine that the stream hangs as "if the container is not running and stream consumption didnt finish", as there is not a better way to do it (please let me know if you have one). In that case

I have mentioned that in the first pr #23618 (comment)
I think it adds some complexity that's not worth as this bugs doesn't occur so often

@dstandish
Copy link
Contributor

one is I think you need to kill the logging process when on_kill is called.

Sorry, i dont think i understand what's the point here

in operators, if we kill the task (e.g. from UI) shouldn't we also kill the logging subprocess? if we don't do that, will it continue running e.g. on celery worker?

@schattian
Copy link
Contributor Author

schattian commented Jun 20, 2022

in operators, if we kill the task (e.g. from UI) shouldn't we also kill the logging subprocess? if we don't do that, will it continue running e.g. on celery worker?

Ah, okay, let me check it. I will push the above mentioned changes (make new funcs private)

@schattian
Copy link
Contributor Author

schattian commented Jun 20, 2022

@dstandish i've forgot, but what happens in such case (and same for whenever the pod is deleted for some other external issue that could cause, for example, pod not found) is that the kubectl stream api returns immediately, so this is not an issue (see eg cncf kube pod operator on_kill def).

I've just made the functions private

@schattian
Copy link
Contributor Author

hm, are calls to method wrapped by a private function mocked?
I dont see any other way the tests can timeout with the new changes. They are not flaky, only fail consistently after the addition

@potiuk
Copy link
Member

potiuk commented Jun 21, 2022

@schattian ALWAYS rebase your changes when you attempt to see if the tests are passing. This way you avoid wasting your and committers time on trying to solve problems that are Likely already solved.

You are 102 commits behind. Things ARE not going to work.

image

@schattian
Copy link
Contributor Author

schattian commented Jun 21, 2022

@potiuk uhm i dont see how it helps.
the ci code was the same as before except the new additions, and tests fail for those.

I've updated it btw, lets see

@potiuk
Copy link
Member

potiuk commented Jun 21, 2022

@potiuk uhm i dont see how it helps. the ci code was the same as before except the new additions, and tests fail for those.

I've updated it btw, lets see

I think this is pretty obvious - if there was an issue with some tests at the moment you branched off no matter how hard you try it will stay there if you don't rebase - if it was fixed in one of the 102 commits you will get it fixed.
Why do you think it would work otherwise?

@potiuk
Copy link
Member

potiuk commented Jun 21, 2022

Now - it seem you do have a real issue - lilely introduced by your changes:

You need to (as usual) start breeze in your version (build image) - reproduce it by running tests in the same environment as in CI and fix.

@potiuk
Copy link
Member

potiuk commented Jun 21, 2022

I think this is pretty obvious - if there was an issue with some tests at the moment you branched off no matter how hard you try it will stay there if you don't rebase - if it was fixed in one of the 102 commits you will get it fixed.

BTW. It's not guaranteed to work. It just removes one of the potential failure reasons out of the possible issues.

@schattian
Copy link
Contributor Author

yeah thats what i was thinking, added by making the functions private.
What i think happens is that the methods that are looking to be mocked are inside a private function.
I dont see how to change that behaviour other than making the functions non-private again.

@dstandish
Copy link
Contributor

yeah thats what i was thinking, added by making the functions private.
What i think happens is that the methods that are looking to be mocked are inside a private function.
I dont see how to change that behaviour other than making the functions non-private again

wellp sorry yeah if they can't be private they can't be private. i just want to minimize surface area of promises we are making. when we have "public" methods we can't change them without deprecation.

re

@dstandish i've forgot, but what happens in such case (and same for whenever the pod is deleted for some other external issue that could cause, for example, pod not found) is that the kubectl stream api returns immediately, so this is not an issue (see eg cncf kube pod operator on_kill def).

can you clarify... when what happens?

@schattian
Copy link
Contributor Author

can you clarify... when what happens?

The function consuming the logs (the process) will return immediately so it'll terminate.

That's because the stream api will return.

@dstandish
Copy link
Contributor

can you clarify... when what happens?

The function consuming the logs (the process) will return immediately so it'll terminate.

That's because the stream api will return.

Oh i think i understand what you're saying. You are saying that, since on_kill terminates the pod, the log stream will close and the process will die of its own accord, so there's no need to manage the termination of that process directly.

@github-actions
Copy link

github-actions bot commented Aug 7, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 7, 2022
@github-actions github-actions bot closed this Aug 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers okay to merge It's ok to merge this PR as it does not require more tests provider:cncf-kubernetes Kubernetes provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants