Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6040] Fix KubernetesJobWatcher Read time out error #6643

Closed
wants to merge 13 commits into from

Conversation

maxirus
Copy link

@maxirus maxirus commented Nov 22, 2019

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:
    • Setting timeout_seconds=50 in the Watch() loop
      will cause a warning instead of an exception when a worker_uuid does not exist. timeout_seconds targets the list_namespaced_pod method as opposed to the underlying urllib3 library which throws an exception.
    • Adding worker_uuid to the log message so users know which label is being watched

Tests

Commits

Documentation

@dimberman dimberman self-requested a review November 22, 2019 23:29
Copy link
Contributor

@dimberman dimberman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM thank you for catching this! Please fix the flake8 issues and once tests pass I'll gladly merge :)

@mik-laj mik-laj added the k8s label Nov 25, 2019
@codecov-io
Copy link

codecov-io commented Nov 25, 2019

Codecov Report

❗ No coverage uploaded for pull request base (master@67a0f39). Click here to learn what that means.
The diff coverage is 84.35%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #6643   +/-   ##
=========================================
  Coverage          ?   84.31%           
=========================================
  Files             ?      676           
  Lines             ?    38353           
  Branches          ?        0           
=========================================
  Hits              ?    32338           
  Misses            ?     6015           
  Partials          ?        0
Impacted Files Coverage Δ
airflow/serialization/__init__.py 100% <ø> (ø)
airflow/utils/log/es_task_handler.py 94.17% <ø> (ø)
airflow/operators/cassandra_to_gcs.py 63.31% <ø> (ø)
airflow/contrib/hooks/qubole_hook.py 52.67% <ø> (ø)
...ow/gcp/operators/cloud_storage_transfer_service.py 95.63% <ø> (ø)
airflow/operators/local_to_gcs.py 91.3% <ø> (ø)
airflow/www/app.py 97.45% <ø> (ø)
airflow/gcp/utils/field_validator.py 92.3% <ø> (ø)
airflow/hooks/hive_hooks.py 77.6% <ø> (ø)
airflow/contrib/operators/file_to_wasb.py 100% <ø> (ø)
... and 183 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 67a0f39...981365c. Read the comment docs.

@YuvalItzchakov
Copy link

Perhaps settings this as a configuration value, and falling back to a constant in case it doesn't exist?

watcher = watch.Watch()

kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid),
'timeout_seconds': 50}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be a config variable at the least.

Also it would be good if

# **kwargs parameters to pass while calling a kubernetes client core_v1_api methods from Kubernetes Executor
# provided as a single line formatted JSON dictionary string.
# List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis
# See:
# https://raw.githubusercontent.com/kubernetes-client/python/master/kubernetes/client/apis/core_v1_api.py
kube_client_request_args =
applied here (if it doesn't already?) so that this can be set globally for all Kube API calls form airflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, we already use the config option two lines down.

Someone on slack mentioned that this won't work but I don't see why from the code.

Copy link

@gfeldman gfeldman Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If kube_client_request_args is used the Kubernetes executor fails to kick off tasks and the scheduler throws this exception:

[2019-11-25 18:02:53,397] {scheduler_job.py:1352} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1350, in _execute
    self._execute_helper()
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1439, in _execute_helper
    self.executor.heartbeat()
  File "/usr/local/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 136, in heartbeat
    self.sync()
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 801, in sync
    self.kube_scheduler.run_next(task)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 456, in run_next
    self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
  File "/usr/local/lib/python3.7/site-packages/airflow/contrib/kubernetes/pod_launcher.py", line 62, in run_pod_async
    resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/core_v1_api.py", line 6115, in create_namespaced_pod
    (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/core_v1_api.py", line 6148, in create_namespaced_pod_with_http_info
    " to method create_namespaced_pod" % key
TypeError: Got an unexpected keyword argument 'timeout_seconds' to method create_namespaced_pod

Copy link
Contributor

@davlum davlum Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the correct argument name here is _request_timeout. I can't link the generated python API file as it is too large for github, but it's on line 6141 of https://github.com/kubernetes-client/python/blob/master/kubernetes/client/api/core_v1_api.py.

This doc link for the kube_client_request_args is dead. It also states;

List of supported params in **kwargs are similar for all core_v1_apis, hence a single config variable for all apis

I feel like this is the wrong approach, as these setting should be configurable on a per request basis, but that's another matter and much more complex. For example this label_selector argument would fail if passed to the create_namespaced_pod function.

@maxirus
Copy link
Author

maxirus commented Nov 27, 2019

The kube_client_request_args config parameter is "global" to all of the client requests. Setting timeout_seconds here causes other methods, such as create_namespaced_pod to fail as this property is not recognized. If the ask here is to make this configurable, we'd need do so by adding another parameter to the airflow.cfg as to not break current functionality.

My approach assumes that _request_timeout being set to 60 as the default in the config was deliberate so I wanted to be under this value by a considerable margin. That said, querying the API should realistically never take more than about 1s.

If we didn't want to "hard-code" this value here, the other approaches I see are:

  1. Create a new config parameter, watch_timeout_seconds
  2. if "_request_timeout" in kube_client_request_args then timeout_seconds = kube_client_request_args['_request_timeout'] - 1
  3. Check if there are any events for the label/worker uuid first, before the watch. If so, then watch
  4. Leave it hard-coded
  5. Other ideas?

P.S.: The Worker UUID seems to not persist and is created at runtime. If I follow correctly, this get generated each time the scheduler runs. How is this tracked across restarts?

@ashb
Copy link
Member

ashb commented Nov 27, 2019

P.S.: The Worker UUID seems to not persist and is created at runtime. If I follow correctly, this get generated each time the scheduler runs. How is this tracked across restarts?

There's airflow.models.kubernetes.KubeWorkerIdentifier table with a singleton row where it should be stored.

@davlum
Copy link
Contributor

davlum commented Nov 27, 2019

@maxirus I would favour either 1. or 4. for simplicity.

@ashb
Copy link
Member

ashb commented Nov 27, 2019

Leaving it hard-coded is going to break it if someone changes the default _request_timeout from 60s.

kube_client_request_args.get('_request_timeout', [60,60])[0] - 1

@maxirus
Copy link
Author

maxirus commented Dec 4, 2019

@ashb is this kube_client_request_args.get('_request_timeout', [60,60])[0] - 1 the way you want to handle it?

@dimberman
Copy link
Contributor

@ashb @davlum bumping this ticket as I would like to get this merged.

@dmateusp
Copy link
Contributor

Same! I'm using this environment variable as a work-around for now:

 AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{ "_request_timeout": "50" }'

@ashb
Copy link
Member

ashb commented Dec 12, 2019

@ashb is this kube_client_request_args.get('_request_timeout', [60,60])[0] - 1 the way you want to handle it?

@maxirus Yes, and we should possibly put a min() around it -- min(kube_client_request_args.get('_request_timeout', [60,60])[0] - 1, 1) say?

@maxirus
Copy link
Author

maxirus commented Dec 13, 2019

I can implement it like this but do we really want the timeout to always be 1 for any integer >0?

@ashb
Copy link
Member

ashb commented Dec 13, 2019

I can implement it like this but do we really want the timeout to always be 1 for any integer >0?

🤦‍♂ No not at all. Broken logic. max not min was what I wanted. (I.e. to ensuere that the timeout never goes negative, and 0 probably means no timeout. Maybe we want to allow that but I suspect we don't.)

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible it would be nice to add some unit tests too.

for key, value in kube_config.kube_client_request_args.iteritems():
kwargs[key] = value
conn_timeout = kube_config.kube_client_request_args.get('_request_timeout', [60, 60])[0]
kwargs['timeout_seconds'] = conn_timeout - 1 if conn_timeout - 1 > 0 else 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
kwargs['timeout_seconds'] = conn_timeout - 1 if conn_timeout - 1 > 0 else 1
kwargs['timeout_seconds'] = max(conn_timeout - 1, 1)

(Assuming they are integers.)

@maxirus
Copy link
Author

maxirus commented Dec 17, 2019

If possible it would be nice to add some unit tests too.

Will try to get to this later this week.

@mbelang
Copy link

mbelang commented Jan 16, 2020

@maxirus Any progress?

@maxirus
Copy link
Author

maxirus commented Jan 17, 2020

@mbelang No. Between the holidays & work I have not had time. Hoping to have some time this weekend.

@mbelang
Copy link

mbelang commented Jan 17, 2020

this mitigated the problem at least :)

AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{ "_request_timeout": "50" }'

What is the default timeout currently?

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 4, 2020
@maxirus
Copy link
Author

maxirus commented Feb 4, 2020

Sooo the test framework has a high barrier to entry and there doesn't seem to be any existing tests for the KubernetesJobWatcher class to piggy-back on. I attempted to get the Test platform going, reading what documentation there is, but I just don't have the time to invest in it.

@maxirus maxirus closed this Feb 4, 2020
sbrandtb added a commit to FRI-DAY/python that referenced this pull request Feb 4, 2020
If `_request_timeout` is neither an int, nor a 2-tuple, it is swallowed
without further notice which is a rather unfortunate because the level
developers would have to look for this issue is pretty deep.

This actually leads to confusion already, see
apache/airflow#6643 (comment)

While it would break backwards compatibility to raise an exception, we
should at least warn the developer.
@sbrandtb
Copy link
Contributor

sbrandtb commented Feb 4, 2020

@mbelang Unfortunately, your mitigation disables the timeout completely, see kubernetes-client/python#1069.

Passing a string there makes the timeout disappear and lets the scheduler process wait forever. I have not checked deep enough to find out if that is a real problem or not though.

@ashb
Copy link
Member

ashb commented Feb 5, 2020

The kube tests in particular are the hardest to test, yes :(

@vasinata
Copy link

Was this resolved? Setting AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{ "_request_timeout": "50" }' did not resolve our issue with KubernetesJobWatcher

@brtasavpatel
Copy link

Was this resolved? Setting AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{ "_request_timeout": "50" }' did not resolve our issue with KubernetesJobWatcher

same here.

@pvcnt
Copy link

pvcnt commented Feb 29, 2020

Hi, has there been any follow-up on this? This is really a blocker, as it seems like the KubernetesExecutor is completely broken, with no workaround.

@pvcnt
Copy link

pvcnt commented Feb 29, 2020

I just tested the fix presented here and I confirm it does work. Was this PR closed only because of the lack of a unit test?

@donotpush
Copy link

I found this guide very useful for those setting up Airflow on Kubernetes executor for the first time https://github.com/stwind/airflow-on-kubernetes

@sbrandtb
Copy link
Contributor

sbrandtb commented Mar 1, 2020

@pvcnt Please check out my last comment: The fix does not work as intended.

The Kubernetes job watcher is a process that is solely looping over the Kubernetes API, waiting for changes of Pods. That's all it does. If it crashes with a timeout or gets restarted virtually does not matter. What you are seeing is false alarm.

The fix should rather be that the timeout is gracefully handled. I will provide a patch when I find some time for doing it.

Again: The issue here does not impair the operations of the KubernetesExecutor.

@pvcnt
Copy link

pvcnt commented Mar 1, 2020

@sbrandtb From what I understood it is exactly the purpose to set a server-side timeout and handle it gracefully, instead of relying on a client-side timeout that triggers an exception, isn't it? What other approach do you propose?

I had several issues going on at the same time in my cluster, so maybe what I was observing what caused by another issue (solved since). But still, in the current state there is a logs pollution that makes much more difficult to identify a real problem.

@sbrandtb
Copy link
Contributor

sbrandtb commented Mar 4, 2020

@pvcnt No, this MR does nothing in order to catch the inevitable request timeout exception. It only fiddles around with the timeout parameters, which, like I explained before, does not help. The problem is that there will always be a timeout and that just needs to be handled. #7616 does that.

@maxirus
Copy link
Author

maxirus commented Mar 5, 2020

@sbrandtb You shouldn't catch (and subsequently ignore) a connection/response timeout error. _request_timeout is setting the connection timeout and response timeout for the underlying urllib3 library. This timeout is being hit because Watcher sends no data back when it doesn't have any matches for it's query (it's a stream). This will hide the real error when there's a case of the API server not responding.

Setting timeout_seconds is telling the Watcher to just stop watching and allow processing to continue. This PR is the proper fix as it's gracefully handling the case where there are no active pods (airflow tasks) running with this particular uuid and the API server has no data about them.

@ashb @dimberman I would suggest taking another look at PR #7616

@sbrandtb
Copy link
Contributor

sbrandtb commented Mar 5, 2020

@maxirus Sorry, my bad. I did not see in fact that you were setting timeout_seconds.

However, I still disagree with you setting the _request_timeout to [60, 60] by default. See my referenced merge request in Kubernetes client about how broken handling that parameter is in the openapi generator project.

Either:

  • You just assume you are smarter than everyone else and that no one needs to set another timeout and just hard code it to some reasonable value (like, 10 seconds - why not?)
  • or you use whatever is set in Airflow's settings
  • or create a new setting for this

Because, if the request timeout from settings is something else than [60, 60] that means the user conciously put in that value, because the first is the default in Airflow. Please, do not double-default.

But in general I agree that setting the timeout_sconds is the right approach to fix the actual issue.

@maxirus
Copy link
Author

maxirus commented Mar 6, 2020

@sbrandtb I think you should take another look at the PR and read the comments in this thread again. My PR doesn't change the _request_timeout in any way.

I still disagree with you setting the _request_timeout to [60, 60] by default

Where am I setting this?

You just assume you are smarter than everyone else and that no one needs to set another timeout and just hard code it to some reasonable value (like, 10 seconds - why not?)

Nope... It's been configurable for a number of releases now and I didn't set this default value.

or you use whatever is set in Airflow's settings

Yep.

or create a new setting for this

Again, read the comments please. That is not how the maintainers wanted to handle it (see here)

...because the first is the default in Airflow. Please, do not double-default.

Where's the double default?

@benbendemo
Copy link

I wanna know how to fix it right now, from all above viewpoints, we know that we need to pass a timeout_seconds keyword parameter from KubernetesJobWatcher into watch.py.

def _run(self, kube_client, resource_version, worker_uuid, kube_config):
        self.log.info(
            'Event: and now my watch begins starting at resource_version: %s',
            resource_version
        )
        watcher = watch.Watch()
        kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
      
        if resource_version:
            kwargs['resource_version'] = resource_version
        if kube_config.kube_client_request_args:
            for key, value in kube_config.kube_client_request_args.items():
                kwargs[key] = value

        last_resource_version = None
        for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
                                    **kwargs):
            task = event['object']
            self.log.info(
                'Event: %s had an event of type %s',
                task.metadata.name, event['type']
            )
            if event['type'] == 'ERROR':
                return self.process_error(event)
            self.process_status(
                task.metadata.name, task.status.phase, task.metadata.labels,
                task.metadata.resource_version
            )
            last_resource_version = task.metadata.resource_version

        return last_resource_version

guess change
kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
into
kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), 'timeout_seconds': 50}
would do right?

However, what i wonder is this problem arise from apache-airflow 1.10.6 version, see AIRFLOW-6040, how about you guys not fix it by this way till 1.10.9 version? probably airflow teams didn't regard this as a solution?

@maxirus

@pvcnt
Copy link

pvcnt commented Nov 26, 2020

Would it be possible to re-open this PR and consider applying this fix (or a similar one)? This issue is still present in the latest release of Airflow (scheduler logs are polluted with ReadTimeoutError), and setting timeout_seconds is a fix that works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.