From 153bcdef0ed86f14474a21530d6caf153b5d3dd1 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 19 Nov 2019 09:11:03 +0000 Subject: [PATCH 1/8] launch and monitor multiple workers using supervisor --- Dockerfile | 3 ++- bin/docker-entrypoint | 13 +++++++++++-- docker-compose.yml | 1 - worker.conf | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 worker.conf diff --git a/Dockerfile b/Dockerfile index 61ec64a557..ebb0983e1b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,8 @@ RUN apt-get update && \ libssl-dev \ default-libmysqlclient-dev \ freetds-dev \ - libsasl2-dev && \ + libsasl2-dev \ + supervisor && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index d88266fa2d..50ae335d76 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -25,13 +25,22 @@ dev_scheduler() { worker() { echo "Starting RQ worker..." - exec /app/manage.py rq worker $QUEUES + start_worker } dev_worker() { echo "Starting dev RQ worker..." - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES + start_worker "watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- " +} + +start_worker() { + export EXECUTION_PREFIX=$1 + export WORKERS_COUNT=${WORKERS_COUNT:-2} + export QUEUES=${QUEUES:-} + + supervisord -c worker.conf + touch /tmp/worker.log && tail -f /tmp/worker.log } dev_celery_worker() { diff --git a/docker-compose.yml b/docker-compose.yml index d444007d9b..cf8742a619 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,7 +41,6 @@ services: target: /app depends_on: - server - tty: true environment: PYTHONUNBUFFERED: 0 REDASH_LOG_LEVEL: "INFO" diff --git a/worker.conf b/worker.conf new file mode 100644 index 0000000000..ffc41a8067 --- /dev/null +++ b/worker.conf @@ -0,0 +1,15 @@ +[supervisord] +logfile=/dev/stdout +logfile_maxbytes=0 +pidfile=/tmp/supervisord.pid + +[program:worker] +command=%(ENV_EXECUTION_PREFIX)s ./manage.py rq worker %(ENV_QUEUES)s +process_name=%(program_name)s-%(process_num)s +numprocs=%(ENV_WORKERS_COUNT)s +directory=/app +stopsignal=TERM +autostart=true +autorestart=true +stdout_logfile=/tmp/worker.log +redirect_stderr=true \ No newline at end of file From 4d1ae284ff2f3b8d1a45ff153d6e2e3ea36bc19a Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 19 Nov 2019 09:21:09 +0000 Subject: [PATCH 2/8] run supervisord in non-daemon mode --- bin/docker-entrypoint | 2 +- worker.conf | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 50ae335d76..57cbe74467 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -39,8 +39,8 @@ start_worker() { export WORKERS_COUNT=${WORKERS_COUNT:-2} export QUEUES=${QUEUES:-} + touch /tmp/worker.log && tail -f /tmp/worker.log & supervisord -c worker.conf - touch /tmp/worker.log && tail -f /tmp/worker.log } dev_celery_worker() { diff --git a/worker.conf b/worker.conf index ffc41a8067..59bcd8fbc8 100644 --- a/worker.conf +++ b/worker.conf @@ -1,7 +1,7 @@ [supervisord] -logfile=/dev/stdout -logfile_maxbytes=0 +logfile=/tmp/supervisord.log pidfile=/tmp/supervisord.pid +nodaemon=true [program:worker] command=%(ENV_EXECUTION_PREFIX)s ./manage.py rq worker %(ENV_QUEUES)s From 1fb1a7ec013b9c984acbb17c0c9ce930e4468607 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 19 Nov 2019 10:31:32 +0000 Subject: [PATCH 3/8] redirect all output to stdout/stderr --- bin/docker-entrypoint | 1 - worker.conf | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 57cbe74467..893656880f 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -39,7 +39,6 @@ start_worker() { export WORKERS_COUNT=${WORKERS_COUNT:-2} export QUEUES=${QUEUES:-} - touch /tmp/worker.log && tail -f /tmp/worker.log & supervisord -c worker.conf } diff --git a/worker.conf b/worker.conf index 59bcd8fbc8..c4963af190 100644 --- a/worker.conf +++ b/worker.conf @@ -11,5 +11,7 @@ directory=/app stopsignal=TERM autostart=true autorestart=true -stdout_logfile=/tmp/worker.log -redirect_stderr=true \ No newline at end of file +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 \ No newline at end of file From fdbbd1341b3dd7a988eddd048f848bbd7e362f5d Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 19 Nov 2019 10:47:49 +0000 Subject: [PATCH 4/8] no need to log supervisord's output because it is redirected to stdout anyway --- worker.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker.conf b/worker.conf index c4963af190..5f55c32f1b 100644 --- a/worker.conf +++ b/worker.conf @@ -1,5 +1,5 @@ [supervisord] -logfile=/tmp/supervisord.log +logfile=/dev/null pidfile=/tmp/supervisord.pid nodaemon=true From 22065a754f8ca3b8562b77c1d6268ae3bb77a4f8 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 10 Oct 2019 15:49:24 +0300 Subject: [PATCH 5/8] updated and less brittle healthcheck --- redash/cli/rq.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index c103d6707d..fb32af7f02 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -44,8 +44,31 @@ def healthcheck(): all_workers = Worker.all() local_workers = [w for w in all_workers if w.hostname == hostname] + row_format ="{:>10}" * (len(local_workers) + 1) + + print("Local worker PIDs:") + local_worker_pids = set([w.pid for w in local_workers]) + print(row_format.format("", *local_worker_pids)) + + print("Time since seen:") heartbeats = [w.last_heartbeat for w in local_workers] time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats] - active = [t.seconds < 60 for t in time_since_seen] + print(row_format.format("", *[t.seconds for t in time_since_seen])) + seen_lately = [t.seconds < 60 for t in time_since_seen] + + print("State:") + states = [w.state for w in local_workers] + print(row_format.format("", *states)) + busy = [s == "busy" for s in states] + + print("Jobs in queues:") + jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers] + print(row_format.format("", *jobs_in_queues)) + has_nothing_to_do = [j == 0 for j in jobs_in_queues] + + print("Healty:") + # a healthy worker is either busy, has been seen lately or has nothing to do + healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)] + print(row_format.format("", *healthy)) - sys.exit(int(not all(active))) + sys.exit(int(not all(healthy))) From 092eb6ca0a770911af16e63624685dff142b279e Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 21 Nov 2019 12:39:10 +0000 Subject: [PATCH 6/8] add supervisor healthchecks --- bin/docker-entrypoint | 15 ++++------- redash/cli/rq.py | 58 +++++++++++++++++++++---------------------- requirements.txt | 2 ++ worker.conf | 19 ++++++++++++-- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index 893656880f..cb8521c313 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -25,21 +25,16 @@ dev_scheduler() { worker() { echo "Starting RQ worker..." - start_worker + export WORKERS_COUNT=${WORKERS_COUNT:-2} + export QUEUES=${QUEUES:-} + + supervisord -c worker.conf } dev_worker() { echo "Starting dev RQ worker..." - start_worker "watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- " -} - -start_worker() { - export EXECUTION_PREFIX=$1 - export WORKERS_COUNT=${WORKERS_COUNT:-2} - export QUEUES=${QUEUES:-} - - supervisord -c worker.conf + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES } dev_celery_worker() { diff --git a/redash/cli/rq.py b/redash/cli/rq.py index fb32af7f02..ce723fcee2 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -5,8 +5,11 @@ from click import argument from flask.cli import AppGroup -from rq import Connection, Worker +from rq import Connection +from rq.worker import Worker, WorkerStatus from sqlalchemy.orm import configure_mappers +from supervisor_checks import check_runner +from supervisor_checks.check_modules import base from redash import rq_redis_connection from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions @@ -37,38 +40,35 @@ def worker(queues): w.work() -@manager.command() -def healthcheck(): - hostname = socket.gethostname() - with Connection(rq_redis_connection): - all_workers = Worker.all() +class WorkerHealthcheck(base.BaseCheck): + NAME = 'RQ Worker Healthcheck' + + def __call__(self, process_spec): + all_workers = Worker.all(connection=rq_redis_connection) + worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and + w.pid == process_spec['pid']].pop() - local_workers = [w for w in all_workers if w.hostname == hostname] - row_format ="{:>10}" * (len(local_workers) + 1) + is_busy = worker.get_state() == WorkerStatus.BUSY - print("Local worker PIDs:") - local_worker_pids = set([w.pid for w in local_workers]) - print(row_format.format("", *local_worker_pids)) + time_since_seen = datetime.datetime.utcnow() - worker.last_heartbeat + seen_lately = time_since_seen.seconds < 60 - print("Time since seen:") - heartbeats = [w.last_heartbeat for w in local_workers] - time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats] - print(row_format.format("", *[t.seconds for t in time_since_seen])) - seen_lately = [t.seconds < 60 for t in time_since_seen] + total_jobs_in_watched_queues = sum([len(q.jobs) for q in worker.queues]) + has_nothing_to_do = total_jobs_in_watched_queues == 0 - print("State:") - states = [w.state for w in local_workers] - print(row_format.format("", *states)) - busy = [s == "busy" for s in states] + is_healthy = is_busy or seen_lately or has_nothing_to_do - print("Jobs in queues:") - jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers] - print(row_format.format("", *jobs_in_queues)) - has_nothing_to_do = [j == 0 for j in jobs_in_queues] + self._log("Worker %s healthcheck: Is busy? %s. " + "Seen lately? %s (%d seconds ago). " + "Has nothing to do? %s (%d jobs in watched queues). " + "==> Is healthy? %s", + worker.key, is_busy, seen_lately, time_since_seen.seconds, + has_nothing_to_do, total_jobs_in_watched_queues, is_healthy) - print("Healty:") - # a healthy worker is either busy, has been seen lately or has nothing to do - healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)] - print(row_format.format("", *healthy)) + return is_healthy - sys.exit(int(not all(healthy))) + +@manager.command() +def healthcheck(): + return check_runner.CheckRunner( + 'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run() diff --git a/requirements.txt b/requirements.txt index 94e0c98bbe..14d2e94299 100644 --- a/requirements.txt +++ b/requirements.txt @@ -58,6 +58,8 @@ maxminddb-geolite2==2018.703 pypd==1.1.0 disposable-email-domains>=0.0.52 gevent==1.4.0 +supervisor==4.1.0 +supervisor_checks==0.8.1 # Install the dependencies of the bin/bundle-extensions script here. # It has its own requirements file to simplify the frontend client build process -r requirements_bundles.txt diff --git a/worker.conf b/worker.conf index 5f55c32f1b..c77b381275 100644 --- a/worker.conf +++ b/worker.conf @@ -3,8 +3,14 @@ logfile=/dev/null pidfile=/tmp/supervisord.pid nodaemon=true +[unix_http_server] +file = /tmp/supervisor.sock + +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + [program:worker] -command=%(ENV_EXECUTION_PREFIX)s ./manage.py rq worker %(ENV_QUEUES)s +command=./manage.py rq worker %(ENV_QUEUES)s process_name=%(program_name)s-%(process_num)s numprocs=%(ENV_WORKERS_COUNT)s directory=/app @@ -14,4 +20,13 @@ autorestart=true stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 \ No newline at end of file +stderr_logfile_maxbytes=0 + +[eventlistener:worker_healthcheck] +serverurl=AUTO +command=./manage.py rq healthcheck +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 +events=TICK_60 \ No newline at end of file From 76909ea322292bfeddd78f92c4325c761558e259 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 24 Nov 2019 08:55:48 +0000 Subject: [PATCH 7/8] remove redundant supervisor installation as it is installed by pip --- Dockerfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index ebb0983e1b..61ec64a557 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,8 +36,7 @@ RUN apt-get update && \ libssl-dev \ default-libmysqlclient-dev \ freetds-dev \ - libsasl2-dev \ - supervisor && \ + libsasl2-dev && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* From 1d946aec26ae44769e3da3e7d5108ef2d3a60580 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Sun, 24 Nov 2019 08:56:16 +0000 Subject: [PATCH 8/8] add a 5 minute check gate --- redash/cli/rq.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index ce723fcee2..0bf02dee51 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -42,11 +42,31 @@ def worker(queues): class WorkerHealthcheck(base.BaseCheck): NAME = 'RQ Worker Healthcheck' + INTERVAL = datetime.timedelta(minutes=5) + _last_check_time = {} + + + def time_to_check(self, pid): + now = datetime.datetime.utcnow() + + if pid not in self._last_check_time: + self._last_check_time[pid] = now + + if now - self._last_check_time[pid] >= self.INTERVAL: + self._last_check_time[pid] = now + return True + + return False + def __call__(self, process_spec): + pid = process_spec['pid'] + if not self.time_to_check(pid): + return True + all_workers = Worker.all(connection=rq_redis_connection) worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and - w.pid == process_spec['pid']].pop() + w.pid == pid].pop() is_busy = worker.get_state() == WorkerStatus.BUSY