-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monitor general RQ queues (default, periodic and schemas) #4256
Conversation
redash/monitor.py
Outdated
@@ -31,7 +31,8 @@ def get_queues(): | |||
scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct() | |||
query = db.session.execute(union_all(queue_names, scheduled_queue_names)) | |||
|
|||
return ['celery'] + [row[0] for row in query] | |||
general_rq_queues = map(lambda q: 'rq:queue:' + q, ['default', 'periodic', 'schemas']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the rq:queue:
prefix?
Also, is there a way to ask rq
for all the active queues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is it because that's the Redis key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yupp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is there a way to ask rq for all the active queues?
Yeah, I refactored it and now it reports all queues (on the way, it made sense to remove the rq:queue:
prefix due to not poking directly in Redis anymore).
|
||
return queues | ||
return dict({queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()}.items() + | ||
{queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}.items()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{queue.name: {'size': len(queue)} for queue in Queue.all(connection=redis_connection)}.items()) | |
{queue.name: {'size': queue.count} for queue in Queue.all(connection=redis_connection)}.items()) |
Probably better to use Queue.count
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized you're using Python's len
and not llen
. 🤦♂️ I guess it's my cue to go to sleep. 😴
@@ -26,7 +26,7 @@ def get_object_counts(): | |||
return status | |||
|
|||
|
|||
def get_queues(): | |||
def get_celery_queues(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is also used in celery_tasks()
, so need to update the name there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D'oh! Fixed.
What type of PR is this? (check all applicable)
Description
This PR adds queue sizes for the general RQ queues (
default
,periodic
andschemas
) to/status.json
.Related Tickets & Documents
Mobile & Desktop Screenshots/Recordings (if there are UI changes)