Skip to content

Commit

Permalink
Switch from priority queues to function-based queues
Browse files Browse the repository at this point in the history
Fixes: #2907
  • Loading branch information
WesleyAC committed Jul 20, 2023
1 parent 107f5b3 commit e1238e8
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 76 deletions.
4 changes: 2 additions & 2 deletions bookwyrm/activitypub/base_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from bookwyrm.connectors import ConnectorException, get_data
from bookwyrm.signatures import make_signature
from bookwyrm.settings import DOMAIN, INSTANCE_ACTOR_USERNAME
from bookwyrm.tasks import app, MEDIUM
from bookwyrm.tasks import app, MISC

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -241,7 +241,7 @@ def serialize(self, **kwargs):
return data


@app.task(queue=MEDIUM)
@app.task(queue=MISC)
@transaction.atomic
def set_related_field(
model_name, origin_model_name, related_field_name, related_remote_id, data
Expand Down
22 changes: 11 additions & 11 deletions bookwyrm/activitystreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from bookwyrm import models
from bookwyrm.redis_store import RedisStore, r
from bookwyrm.tasks import app, LOW, MEDIUM, HIGH
from bookwyrm.tasks import app, STREAMS, IMPORT_TRIGGERED
from bookwyrm.telemetry import open_telemetry


Expand Down Expand Up @@ -343,7 +343,7 @@ def add_status_on_create(sender, instance, created, *args, **kwargs):

def add_status_on_create_command(sender, instance, created):
"""runs this code only after the database commit completes"""
priority = HIGH
priority = STREAMS
# check if this is an old status, de-prioritize if so
# (this will happen if federation is very slow, or, more expectedly, on csv import)
if instance.published_date < timezone.now() - timedelta(
Expand All @@ -353,7 +353,7 @@ def add_status_on_create_command(sender, instance, created):
if instance.user.local:
return
# an out of date remote status is a low priority but should be added
priority = LOW
priority = IMPORT_TRIGGERED

add_status_task.apply_async(
args=(instance.id,),
Expand Down Expand Up @@ -497,31 +497,31 @@ def remove_statuses_on_unshelve(sender, instance, *args, **kwargs):
# ---- TASKS


@app.task(queue=LOW)
@app.task(queue=STREAMS)
def add_book_statuses_task(user_id, book_id):
"""add statuses related to a book on shelve"""
user = models.User.objects.get(id=user_id)
book = models.Edition.objects.get(id=book_id)
BooksStream().add_book_statuses(user, book)


@app.task(queue=LOW)
@app.task(queue=STREAMS)
def remove_book_statuses_task(user_id, book_id):
"""remove statuses about a book from a user's books feed"""
user = models.User.objects.get(id=user_id)
book = models.Edition.objects.get(id=book_id)
BooksStream().remove_book_statuses(user, book)


@app.task(queue=MEDIUM)
@app.task(queue=STREAMS)
def populate_stream_task(stream, user_id):
"""background task for populating an empty activitystream"""
user = models.User.objects.get(id=user_id)
stream = streams[stream]
stream.populate_streams(user)


@app.task(queue=MEDIUM)
@app.task(queue=STREAMS)
def remove_status_task(status_ids):
"""remove a status from any stream it might be in"""
# this can take an id or a list of ids
Expand All @@ -536,7 +536,7 @@ def remove_status_task(status_ids):
)


@app.task(queue=HIGH)
@app.task(queue=STREAMS)
def add_status_task(status_id, increment_unread=False):
"""add a status to any stream it should be in"""
status = models.Status.objects.select_subclasses().get(id=status_id)
Expand All @@ -548,7 +548,7 @@ def add_status_task(status_id, increment_unread=False):
stream.add_status(status, increment_unread=increment_unread)


@app.task(queue=MEDIUM)
@app.task(queue=STREAMS)
def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
"""remove all statuses by a user from a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
Expand All @@ -558,7 +558,7 @@ def remove_user_statuses_task(viewer_id, user_id, stream_list=None):
stream.remove_user_statuses(viewer, user)


@app.task(queue=MEDIUM)
@app.task(queue=STREAMS)
def add_user_statuses_task(viewer_id, user_id, stream_list=None):
"""add all statuses by a user to a viewer's stream"""
stream_list = [streams[s] for s in stream_list] if stream_list else streams.values()
Expand All @@ -568,7 +568,7 @@ def add_user_statuses_task(viewer_id, user_id, stream_list=None):
stream.add_user_statuses(viewer, user)


@app.task(queue=MEDIUM)
@app.task(queue=STREAMS)
def handle_boost_task(boost_id):
"""remove the original post and other, earlier boosts"""
instance = models.Status.objects.get(id=boost_id)
Expand Down
6 changes: 3 additions & 3 deletions bookwyrm/connectors/connector_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from bookwyrm import book_search, models
from bookwyrm.settings import SEARCH_TIMEOUT
from bookwyrm.tasks import app, LOW
from bookwyrm.tasks import app, CONNECTORS

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,7 +109,7 @@ def get_or_create_connector(remote_id):
return load_connector(connector_info)


@app.task(queue=LOW)
@app.task(queue=CONNECTORS)
def load_more_data(connector_id, book_id):
"""background the work of getting all 10,000 editions of LoTR"""
connector_info = models.Connector.objects.get(id=connector_id)
Expand All @@ -118,7 +118,7 @@ def load_more_data(connector_id, book_id):
connector.expand_book_data(book)


@app.task(queue=LOW)
@app.task(queue=CONNECTORS)
def create_edition_task(connector_id, work_id, data):
"""separate task for each of the 10,000 editions of LoTR"""
connector_info = models.Connector.objects.get(id=connector_id)
Expand Down
4 changes: 2 additions & 2 deletions bookwyrm/emailing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.template.loader import get_template

from bookwyrm import models, settings
from bookwyrm.tasks import app, HIGH
from bookwyrm.tasks import app, EMAIL
from bookwyrm.settings import DOMAIN


Expand Down Expand Up @@ -75,7 +75,7 @@ def format_email(email_name, data):
return (subject, html_content, text_content)


@app.task(queue=HIGH)
@app.task(queue=EMAIL)
def send_email(recipient, subject, html_content, text_content):
"""use a task to send the email"""
email = EmailMultiAlternatives(
Expand Down
12 changes: 6 additions & 6 deletions bookwyrm/lists_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from bookwyrm import models
from bookwyrm.redis_store import RedisStore
from bookwyrm.tasks import app, MEDIUM, HIGH
from bookwyrm.tasks import app, LISTS


class ListsStream(RedisStore):
Expand Down Expand Up @@ -217,14 +217,14 @@ def add_list_on_account_create_command(user_id):


# ---- TASKS
@app.task(queue=MEDIUM)
@app.task(queue=LISTS)
def populate_lists_task(user_id):
"""background task for populating an empty list stream"""
user = models.User.objects.get(id=user_id)
ListsStream().populate_lists(user)


@app.task(queue=MEDIUM)
@app.task(queue=LISTS)
def remove_list_task(list_id, re_add=False):
"""remove a list from any stream it might be in"""
stores = models.User.objects.filter(local=True, is_active=True).values_list(
Expand All @@ -239,22 +239,22 @@ def remove_list_task(list_id, re_add=False):
add_list_task.delay(list_id)


@app.task(queue=HIGH)
@app.task(queue=LISTS)
def add_list_task(list_id):
"""add a list to any stream it should be in"""
book_list = models.List.objects.get(id=list_id)
ListsStream().add_list(book_list)


@app.task(queue=MEDIUM)
@app.task(queue=LISTS)
def remove_user_lists_task(viewer_id, user_id, exclude_privacy=None):
"""remove all lists by a user from a viewer's stream"""
viewer = models.User.objects.get(id=viewer_id)
user = models.User.objects.get(id=user_id)
ListsStream().remove_user_lists(viewer, user, exclude_privacy=exclude_privacy)


@app.task(queue=MEDIUM)
@app.task(queue=LISTS)
def add_user_lists_task(viewer_id, user_id):
"""add all lists by a user to a viewer's stream"""
viewer = models.User.objects.get(id=viewer_id)
Expand Down
8 changes: 4 additions & 4 deletions bookwyrm/models/activitypub_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from bookwyrm import activitypub
from bookwyrm.settings import USER_AGENT, PAGE_LENGTH
from bookwyrm.signatures import make_signature, make_digest
from bookwyrm.tasks import app, MEDIUM, BROADCAST
from bookwyrm.tasks import app, BROADCAST
from bookwyrm.models.fields import ImageField, ManyToManyField

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -379,7 +379,7 @@ class CollectionItemMixin(ActivitypubMixin):

activity_serializer = activitypub.CollectionItem

def broadcast(self, activity, sender, software="bookwyrm", queue=MEDIUM):
def broadcast(self, activity, sender, software="bookwyrm", queue=BROADCAST):
"""only send book collection updates to other bookwyrm instances"""
super().broadcast(activity, sender, software=software, queue=queue)

Expand All @@ -400,7 +400,7 @@ def recipients(self):
return []
return [collection_field.user]

def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs):
def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs):
"""broadcast updated"""
# first off, we want to save normally no matter what
super().save(*args, **kwargs)
Expand Down Expand Up @@ -444,7 +444,7 @@ def to_remove_activity(self, user):
class ActivityMixin(ActivitypubMixin):
"""add this mixin for models that are AP serializable"""

def save(self, *args, broadcast=True, priority=MEDIUM, **kwargs):
def save(self, *args, broadcast=True, priority=BROADCAST, **kwargs):
"""broadcast activity"""
super().save(*args, **kwargs)
user = self.user if hasattr(self, "user") else self.user_subject
Expand Down
4 changes: 2 additions & 2 deletions bookwyrm/models/antispam.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.db.models import Q
from django.utils.translation import gettext_lazy as _

from bookwyrm.tasks import app, LOW
from bookwyrm.tasks import app, MISC
from .base_model import BookWyrmModel
from .user import User

Expand Down Expand Up @@ -65,7 +65,7 @@ class AutoMod(AdminModel):
created_by = models.ForeignKey("User", on_delete=models.PROTECT)


@app.task(queue=LOW)
@app.task(queue=MISC)
def automod_task():
"""Create reports"""
if not AutoMod.objects.exists():
Expand Down
8 changes: 4 additions & 4 deletions bookwyrm/models/import_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
Review,
ReviewRating,
)
from bookwyrm.tasks import app, LOW, IMPORTS
from bookwyrm.tasks import app, IMPORT_TRIGGERED, IMPORTS
from .fields import PrivacyLevels


Expand Down Expand Up @@ -399,7 +399,7 @@ def handle_imported_book(item):
shelved_date = item.date_added or timezone.now()
ShelfBook(
book=item.book, shelf=desired_shelf, user=user, shelved_date=shelved_date
).save(priority=LOW)
).save(priority=IMPORT_TRIGGERED)

for read in item.reads:
# check for an existing readthrough with the same dates
Expand Down Expand Up @@ -441,7 +441,7 @@ def handle_imported_book(item):
published_date=published_date_guess,
privacy=job.privacy,
)
review.save(software="bookwyrm", priority=LOW)
review.save(software="bookwyrm", priority=IMPORT_TRIGGERED)
else:
# just a rating
review = ReviewRating.objects.filter(
Expand All @@ -458,7 +458,7 @@ def handle_imported_book(item):
published_date=published_date_guess,
privacy=job.privacy,
)
review.save(software="bookwyrm", priority=LOW)
review.save(software="bookwyrm", priority=IMPORT_TRIGGERED)

# only broadcast this review to other bookwyrm instances
item.linked_review = review
Expand Down
7 changes: 3 additions & 4 deletions bookwyrm/models/relationship.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from django.db.models import Q

from bookwyrm import activitypub
from bookwyrm.tasks import HIGH
from .activitypub_mixin import ActivitypubMixin, ActivityMixin
from .activitypub_mixin import generate_activity
from .base_model import BookWyrmModel
Expand Down Expand Up @@ -142,7 +141,7 @@ def save(self, *args, broadcast=True, **kwargs): # pylint: disable=arguments-di

# a local user is following a remote user
if broadcast and self.user_subject.local and not self.user_object.local:
self.broadcast(self.to_activity(), self.user_subject, queue=HIGH)
self.broadcast(self.to_activity(), self.user_subject)

if self.user_object.local:
manually_approves = self.user_object.manually_approves_followers
Expand All @@ -166,7 +165,7 @@ def accept(self, broadcast_only=False):
actor=self.user_object.remote_id,
object=self.to_activity(),
).serialize()
self.broadcast(activity, user, queue=HIGH)
self.broadcast(activity, user)
if broadcast_only:
return

Expand All @@ -187,7 +186,7 @@ def reject(self):
actor=self.user_object.remote_id,
object=self.to_activity(),
).serialize()
self.broadcast(activity, self.user_object, queue=HIGH)
self.broadcast(activity, self.user_object)

self.delete()

Expand Down
6 changes: 3 additions & 3 deletions bookwyrm/models/shelf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from bookwyrm import activitypub
from bookwyrm.settings import DOMAIN
from bookwyrm.tasks import LOW
from bookwyrm.tasks import BROADCAST
from .activitypub_mixin import CollectionItemMixin, OrderedCollectionMixin
from .base_model import BookWyrmModel
from . import fields
Expand Down Expand Up @@ -40,7 +40,7 @@ class Shelf(OrderedCollectionMixin, BookWyrmModel):

activity_serializer = activitypub.Shelf

def save(self, *args, priority=LOW, **kwargs):
def save(self, *args, priority=BROADCAST, **kwargs):
"""set the identifier"""
super().save(*args, priority=priority, **kwargs)
if not self.identifier:
Expand Down Expand Up @@ -100,7 +100,7 @@ class ShelfBook(CollectionItemMixin, BookWyrmModel):
activity_serializer = activitypub.ShelfItem
collection_field = "shelf"

def save(self, *args, priority=LOW, **kwargs):
def save(self, *args, priority=BROADCAST, **kwargs):
if not self.user:
self.user = self.shelf.user
if self.id and self.user.local:
Expand Down
6 changes: 3 additions & 3 deletions bookwyrm/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from bookwyrm.preview_images import generate_user_preview_image_task
from bookwyrm.settings import DOMAIN, ENABLE_PREVIEW_IMAGES, USE_HTTPS, LANGUAGES
from bookwyrm.signatures import create_key_pair
from bookwyrm.tasks import app, LOW
from bookwyrm.tasks import app, MISC
from bookwyrm.utils import regex
from .activitypub_mixin import OrderedCollectionPageMixin, ActivitypubMixin
from .base_model import BookWyrmModel, DeactivationReason, new_access_code
Expand Down Expand Up @@ -469,7 +469,7 @@ def save(self, *args, **kwargs):
return super().save(*args, **kwargs)


@app.task(queue=LOW)
@app.task(queue=MISC)
def set_remote_server(user_id, allow_external_connections=False):
"""figure out the user's remote server in the background"""
user = User.objects.get(id=user_id)
Expand Down Expand Up @@ -528,7 +528,7 @@ def get_or_create_remote_server(
return server


@app.task(queue=LOW)
@app.task(queue=MISC)
def get_remote_reviews(outbox):
"""ingest reviews by a new remote bookwyrm user"""
outbox_page = outbox + "?page=true&type=Review"
Expand Down
Loading

0 comments on commit e1238e8

Please sign in to comment.