diff --git a/cl/api/views.py b/cl/api/views.py index b64ff9299b..8a48ff2729 100644 --- a/cl/api/views.py +++ b/cl/api/views.py @@ -85,7 +85,7 @@ def api_index(request: HttpRequest) -> HttpResponse: ) -def replication_docs(request: HttpRequest) -> HttpResponse: +async def replication_docs(request: HttpRequest) -> HttpResponse: return render(request, "replication.html", {"private": False}) @@ -159,7 +159,7 @@ def coverage_data(request, version, court): ) -def get_result_count(request, version, day_count): +async def get_result_count(request, version, day_count): """Get the count of results for the past `day_count` number of days GET parameters will be a complete search string @@ -199,7 +199,7 @@ def get_result_count(request, version, day_count): return JsonResponse({"count": response.result.numFound}, safe=True) -def deprecated_api(request, v): +async def deprecated_api(request, v): return JsonResponse( { "meta": { @@ -213,12 +213,12 @@ def deprecated_api(request, v): ) -def webhooks_getting_started(request): +async def webhooks_getting_started(request): context = {"private": False} return render(request, "webhooks-getting-started.html", context) -def webhooks_docs(request, version=None): +async def webhooks_docs(request, version=None): """Show the correct version of the webhooks docs""" context = {"private": False} diff --git a/cl/corpus_importer/management/commands/troller_bk.py b/cl/corpus_importer/management/commands/troller_bk.py index 47afbf5bd2..7d4acf6744 100644 --- a/cl/corpus_importer/management/commands/troller_bk.py +++ b/cl/corpus_importer/management/commands/troller_bk.py @@ -12,6 +12,7 @@ from typing import Any, DefaultDict, Mapping, TypedDict from urllib.parse import unquote +from asgiref.sync import async_to_sync, sync_to_async from dateutil.parser import ParserError from django.db import DataError, IntegrityError, transaction from django.db.models import Q @@ -45,7 +46,7 @@ FILES_BUFFER_THRESHOLD = 3 -def check_for_early_termination( +async def check_for_early_termination( court_id: str, docket: dict[str, Any] ) -> str | None: """Check for early termination, skip the rest of the file in case a cached @@ -58,13 +59,13 @@ def check_for_early_termination( omitted, "continue" if only the current item should be omitted or None. """ item_hash = hash_item(docket) - if is_cached(item_hash): + if await is_cached(item_hash): logger.info( f"Hit a cached item, finishing adding bulk entries for {court_id} feed. " ) return "break" - cache_hash(item_hash) + await cache_hash(item_hash) if ( not docket["pacer_case_id"] and not docket["docket_number"] @@ -228,7 +229,7 @@ def get_rds_to_add( return rds_to_create_bulk -def merge_rss_data( +async def merge_rss_data( feed_data: list[dict[str, Any]], court_id: str, build_date: datetime | None, @@ -242,7 +243,7 @@ def merge_rss_data( """ court_id = map_pacer_to_cl_id(court_id) - court = Court.objects.get(pk=court_id) + court = await Court.objects.aget(pk=court_id) dockets_created = 0 all_rds_created: list[int] = [] district_court_ids = ( @@ -255,7 +256,7 @@ def merge_rss_data( build_date and build_date > make_aware(datetime(year=2018, month=4, day=20), timezone.utc) - and court_id in district_court_ids + and await district_court_ids.filter(id=court_id).aexists() and court_id not in courts_exceptions_no_rss ): # Avoid parsing/adding feeds after we start scraping RSS Feeds for @@ -269,13 +270,13 @@ def merge_rss_data( str, list[dict[str, Any]] ] = defaultdict(list) for docket in feed_data: - skip_or_break = check_for_early_termination(court_id, docket) + skip_or_break = await check_for_early_termination(court_id, docket) if skip_or_break == "continue": continue elif skip_or_break == "break": break - d = find_docket_object( + d = await find_docket_object( court_id, docket["pacer_case_id"], docket["docket_number"], @@ -285,7 +286,9 @@ def merge_rss_data( if ( document_number and d.pk - and d.docket_entries.filter(entry_number=document_number).exists() + and await d.docket_entries.filter( + entry_number=document_number + ).aexists() ): # It's an existing docket entry; let's not add it. continue @@ -301,11 +304,11 @@ def merge_rss_data( ) if ( d.pk - and d.docket_entries.filter( + and await d.docket_entries.filter( query, date_filed=docket_entry["date_filed"], entry_number=docket_entry["document_number"], - ).exists() + ).aexists() ): # It's an existing docket entry; let's not add it. continue @@ -322,7 +325,7 @@ def merge_rss_data( # court and doesn't have a pacer_case_id continue - add_new_docket_from_rss( + await sync_to_async(add_new_docket_from_rss)( court_id, d, docket, @@ -338,15 +341,15 @@ def merge_rss_data( # docket entry to add in bulk. des_to_add_existing_docket.append((d.pk, docket_entry)) try: - d.save(update_fields=["source"]) - add_bankruptcy_data_to_docket(d, docket) + await d.asave(update_fields=["source"]) + await sync_to_async(add_bankruptcy_data_to_docket)(d, docket) except (DataError, IntegrityError) as exc: # Trouble. Log and move on logger.warn( f"Got DataError or IntegrityError while saving docket." ) - rds_created_pks, dockets_created = do_bulk_additions( + rds_created_pks, dockets_created = await sync_to_async(do_bulk_additions)( court_id, unique_dockets, dockets_to_create, @@ -601,7 +604,7 @@ def iterate_and_import_files( f"Skipping: {item_path=} with {court_id=} due to incorrect date format. \n" ) continue - rds_for_solr, dockets_created = merge_rss_data( + rds_for_solr, dockets_created = async_to_sync(merge_rss_data)( feed_data, court_id, build_date ) diff --git a/cl/corpus_importer/tasks.py b/cl/corpus_importer/tasks.py index 2e1d5b4ed7..95d6d98cf0 100644 --- a/cl/corpus_importer/tasks.py +++ b/cl/corpus_importer/tasks.py @@ -9,6 +9,7 @@ import internetarchive as ia import requests +from asgiref.sync import async_to_sync from celery import Task from celery.exceptions import SoftTimeLimitExceeded from django.conf import settings @@ -664,7 +665,9 @@ def get_and_process_free_pdf( # Get the data temporarily. OCR is done for all nightly free # docs in a separate batch, but may as well do the easy ones. - extract_recap_pdf_base(rd.pk, ocr_available=False, check_if_needed=False) + async_to_sync(extract_recap_pdf_base)( + rd.pk, ocr_available=False, check_if_needed=False + ) return {"result": result, "rd_pk": rd.pk} @@ -1056,7 +1059,7 @@ def do_case_query_by_pacer_case_id( # Merge the contents into CL. if d is None: - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_id, pacer_case_id, docket_data["docket_number"] ) @@ -1184,7 +1187,7 @@ def make_docket_by_iquery( ) return None - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_id, str(pacer_case_id), report.data["docket_number"], @@ -1287,7 +1290,7 @@ def get_docket_by_pacer_case_id( return None if d is None: - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_id, pacer_case_id, docket_data["docket_number"] ) @@ -1365,7 +1368,9 @@ def get_appellate_docket_by_docket_number( d = None if d is None: - d = find_docket_object(court_id, docket_number, docket_number) + d = async_to_sync(find_docket_object)( + court_id, docket_number, docket_number + ) rds_created, content_updated = merge_pacer_docket_into_cl_docket( d, @@ -1676,12 +1681,12 @@ def get_document_number_for_appellate( pdf_bytes = local_path.read() if pdf_bytes: # For other jurisdictions try first to get it from the PDF document. - dn_response = microservice( + dn_response = async_to_sync(microservice)( service="document-number", file_type="pdf", file=pdf_bytes, ) - if dn_response.ok and dn_response.text: + if dn_response.is_success and dn_response.text: document_number = dn_response.text if not document_number and pacer_doc_id: @@ -1783,11 +1788,11 @@ def update_rd_metadata( # request.content is sometimes a str, sometimes unicode, so # force it all to be bytes, pleasing hashlib. rd.sha1 = sha1(pdf_bytes) - response = microservice( + response = async_to_sync(microservice)( service="page-count", item=rd, ) - if response.ok: + if response.is_success: rd.page_count = response.text # Save and extract, skipping OCR. @@ -1978,7 +1983,7 @@ def get_pacer_doc_by_rd_and_description( return # Skip OCR for now. It'll happen in a second step. - extract_recap_pdf_base(rd.pk, ocr_available=False) + async_to_sync(extract_recap_pdf_base)(rd.pk, ocr_available=False) add_items_to_solr([rd.pk], "search.RECAPDocument") diff --git a/cl/corpus_importer/tests.py b/cl/corpus_importer/tests.py index 39c7fca815..5a11afa4fc 100644 --- a/cl/corpus_importer/tests.py +++ b/cl/corpus_importer/tests.py @@ -9,6 +9,7 @@ import eyecite import pytest +from asgiref.sync import async_to_sync from django.conf import settings from django.core.files.base import ContentFile from django.utils.timezone import make_aware @@ -1144,7 +1145,7 @@ def test_merge_district_rss_before_2018(self): self.assertEqual( len(self.docket_d_before_2018.docket_entries.all()), 0 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_before_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1187,7 +1188,7 @@ def test_avoid_merging_district_rss_after_2018(self): build_date = d_rss_data_after_2018["docket_entries"][0]["date_filed"] self.assertEqual(len(self.docket_d_after_2018.docket_entries.all()), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1226,7 +1227,7 @@ def test_merge_district_courts_rss_exceptions_after_2018(self): build_date = d_rss_data_after_2018["docket_entries"][0]["date_filed"] self.assertEqual(len(self.docket_d_after_2018.docket_entries.all()), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], self.court_pamd.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1266,7 +1267,7 @@ def test_merging_district_docket_with_entries_before_2018(self): self.assertEqual( len(self.de_d_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_before_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1310,7 +1311,7 @@ def test_avoid_merging_updating_docket_item_without_docket_entries( self.assertEqual( len(self.de_d_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_before_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1344,7 +1345,7 @@ def test_add_new_district_rss_before_2018(self): build_date = d_rss_data_before_2018["docket_entries"][0]["date_filed"] dockets = Docket.objects.filter(pacer_case_id="43562") self.assertEqual(dockets.count(), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_before_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1384,7 +1385,7 @@ def test_avoid_merging_rss_docket_with_entries_district_after_2018(self): self.assertEqual( len(self.de_d_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1426,7 +1427,7 @@ def test_avoid_adding_new_district_rss_after_2018(self): ) build_date = d_rss_data_after_2018["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], self.court.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1461,7 +1462,7 @@ def test_merge_appellate_rss_before_2018(self): self.assertEqual( len(self.docket_a_before_2018.docket_entries.all()), 0 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1502,7 +1503,7 @@ def test_merging_appellate_rss_after_2018(self): build_date = a_rss_data_after_2018["docket_entries"][0]["date_filed"] self.assertEqual(len(self.docket_a_after_2018.docket_entries.all()), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_after_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1545,7 +1546,7 @@ def test_avoid_merging_existing_appellate_entry_before_2018(self): self.assertEqual( len(self.de_a_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1589,7 +1590,7 @@ def test_merge_new_appellate_rss_before_2018(self): build_date = a_rss_data_before_2018["docket_entries"][0]["date_filed"] dockets = Docket.objects.filter(docket_number="23-4233") self.assertEqual(dockets.count(), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1629,7 +1630,7 @@ def test_avoid_merging_existing_appellate_entry_after_2018(self): self.assertEqual( len(self.de_a_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1665,7 +1666,7 @@ def test_merging_appellate_docket_with_entries_after_2018(self): self.assertEqual( len(self.de_a_before_2018.docket.docket_entries.all()), 1 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1710,7 +1711,7 @@ def test_merge_new_appellate_rss_after_2018(self): build_date = d_rss_data_after_2018["docket_entries"][0]["date_filed"] dockets = Docket.objects.filter(docket_number="45-3232") self.assertEqual(dockets.count(), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1745,7 +1746,7 @@ def test_merging_appellate_docket_with_entries_case_id(self): self.assertEqual( len(self.docket_a_2018_case_id.docket_entries.all()), 0 ) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_before_2018], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 1) @@ -1805,7 +1806,7 @@ def test_merge_mapped_court_rss_before_2018(self): build_date = d_rss_data_before_2018["docket_entries"][0]["date_filed"] dockets = Docket.objects.filter(docket_number="3:20-CV-01473") self.assertEqual(dockets.count(), 0) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_before_2018], "neb", build_date ) self.assertEqual(len(rds_created), 1) @@ -1843,7 +1844,7 @@ def test_avoid_merging_district_mapped_court_rss_after_2018(self): ], ) build_date = d_rss_data_after_2018["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [d_rss_data_after_2018], "neb", build_date ) self.assertEqual(len(rds_created), 0) @@ -1895,7 +1896,7 @@ def test_avoid_updating_docket_entry_metadata(self): ) build_date = a_rss_data_unnumbered["docket_entries"][0]["date_filed"] self.assertEqual(len(de_a_unnumbered.docket.docket_entries.all()), 1) - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( [a_rss_data_unnumbered], self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 0) @@ -1966,7 +1967,7 @@ def test_avoid_cached_items(self, mock_logger): cached_items = RssItemCache.objects.all() self.assertEqual(cached_items.count(), 0) build_date = a_rss_data_0["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( list_rss_data_1, self.court_appellate.pk, build_date ) self.assertEqual(len(rds_created), 2) @@ -1975,7 +1976,7 @@ def test_avoid_cached_items(self, mock_logger): # Remove recap_sequence_number from the dict to simulate the same item del a_rss_data_1["docket_entries"][0]["recap_sequence_number"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( list_rss_data_2, self.court_appellate.pk, build_date ) @@ -2127,7 +2128,7 @@ def test_add_objects_in_bulk(self): self.assertEqual(cached_items.count(), 0) build_date = a_rss_data_0["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( list_rss_data, self.court_appellate.pk, build_date ) @@ -2233,7 +2234,7 @@ def test_avoid_adding_district_dockets_no_pacer_case_id_in_bulk(self): ] build_date = a_rss_data_0["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( list_rss_data, self.court_neb.pk, build_date ) @@ -2296,7 +2297,7 @@ def test_avoid_adding_existing_entries_by_description(self): a_rss_data_0, ] build_date = a_rss_data_0["docket_entries"][0]["date_filed"] - rds_created, d_created = merge_rss_data( + rds_created, d_created = async_to_sync(merge_rss_data)( list_rss_data, self.court.pk, build_date ) diff --git a/cl/disclosures/tasks.py b/cl/disclosures/tasks.py index e33dfb0771..9d33839be0 100644 --- a/cl/disclosures/tasks.py +++ b/cl/disclosures/tasks.py @@ -2,6 +2,7 @@ from typing import Optional, Union import requests +from asgiref.sync import async_to_sync from dateutil.parser import ParserError, parse from django.conf import settings from django.core.exceptions import ValidationError @@ -55,12 +56,12 @@ def make_financial_disclosure_thumbnail_from_pdf(self, pk: int) -> None: disclosure = FinancialDisclosure.objects.select_for_update().get(pk=pk) pdf_content = disclosure.filepath.read() - response = microservice( + response = async_to_sync(microservice)( service="generate-thumbnail", file_type="pdf", file=pdf_content, ) - if not response.ok: + if not response.is_success: if self.request.retries == self.max_retries: disclosure.thumbnail_status = THUMBNAIL_STATUSES.FAILED disclosure.save() @@ -88,13 +89,13 @@ def extract_content( # Extraction takes between 7 seconds and 80 minutes for super # long Trump extraction with ~5k investments - response = microservice( + response = async_to_sync(microservice)( service="extract-disclosure", file_type="pdf", file=pdf_bytes, ) - if not response.ok: + if not response.is_success: logger.warning( msg="Could not extract data from this document", extra=dict( @@ -327,7 +328,7 @@ def save_and_upload_disclosure( if len(disclosure) > 0: return disclosure[0] - page_count = microservice( + page_count = async_to_sync(microservice)( service="page-count", file_type="pdf", file=response.content, diff --git a/cl/disclosures/tests.py b/cl/disclosures/tests.py index 281537dcb5..8f7ab244db 100644 --- a/cl/disclosures/tests.py +++ b/cl/disclosures/tests.py @@ -1,6 +1,7 @@ import json import os +from asgiref.sync import async_to_sync from django.conf import settings from django.urls import reverse from selenium.common.exceptions import NoSuchElementException @@ -86,7 +87,7 @@ def test_extraction_and_ingestion_jef(self) -> None: pdf_bytes = f.read() Investment.objects.all().delete() - extracted_data = microservice( + extracted_data = async_to_sync(microservice)( service="extract-disclosure", file_type="pdf", file=pdf_bytes, diff --git a/cl/lib/microservice_utils.py b/cl/lib/microservice_utils.py index 257eab7f08..4ab1e1e61f 100644 --- a/cl/lib/microservice_utils.py +++ b/cl/lib/microservice_utils.py @@ -1,17 +1,18 @@ +from io import BufferedReader + from django.conf import settings -from django.db import models -from requests import Request, Response, Session +from httpx import AsyncClient, Response from cl.audio.models import Audio from cl.lib.search_utils import clean_up_recap_document_file from cl.search.models import Opinion, RECAPDocument -def microservice( +async def microservice( service: str, method: str = "POST", - item: models.Model | None = None, - file: bytes | None = None, + item: RECAPDocument | Opinion | Audio | None = None, + file: BufferedReader | None = None, file_type: str | None = None, filepath: str | None = None, data=None, @@ -38,50 +39,55 @@ def microservice( services = settings.MICROSERVICE_URLS - req = Request( - method=method, - url=services[service]["url"], # type: ignore - ) + files = None # Add file from filepath if filepath: - with open(filepath, "rb") as f: - req.files = {"file": (filepath, f.read())} + files = {"file": (filepath, open(filepath, "rb"))} # Handle our documents based on the type of model object # Sadly these are not uniform if item: if type(item) == RECAPDocument: try: - with item.filepath_local.open(mode="rb") as local_path: - req.files = { - "file": (item.filepath_local.name, local_path.read()) - } + files = { + "file": ( + item.filepath_local.name, + item.filepath_local.open(mode="rb"), + ) + } except FileNotFoundError: # The file is no longer available, clean it up in DB - clean_up_recap_document_file(item) + await clean_up_recap_document_file(item) elif type(item) == Opinion: - with item.local_path.open(mode="rb") as local_path: - req.files = {"file": (item.local_path.name, local_path.read())} + files = { + "file": ( + item.local_path.name, + item.local_path.open(mode="rb"), + ) + } elif type(item) == Audio: - with item.local_path_original_file.open(mode="rb") as local_path: - req.files = { - "file": ( - item.local_path_original_file.name, - local_path.read(), - ) - } + files = { + "file": ( + item.local_path_original_file.name, + item.local_path_original_file.open(mode="rb"), + ) + } # Sometimes we will want to pass in a filename and the file bytes # to avoid writing them to disk. Filename can often be generic # and is used to identify the file extension for our microservices if file and file_type: - req.files = {"file": (f"dummy.{file_type}", file)} + files = {"file": (f"dummy.{file_type}", file)} elif file: - req.files = {"file": (f"filename", file)} - - if data: - req.data = data + files = {"file": (f"filename", file)} - if params: - req.params = params + async with AsyncClient(follow_redirects=True, http2=True) as client: + req = client.build_request( + method=method, + url=services[service]["url"], # type: ignore + data=data, + files=files, + params=params, + timeout=services[service]["timeout"], + ) - return Session().send(req.prepare(), timeout=services[service]["timeout"]) # type: ignore + return await client.send(req) diff --git a/cl/lib/search_utils.py b/cl/lib/search_utils.py index 98643f2ce3..b80dab3e9a 100644 --- a/cl/lib/search_utils.py +++ b/cl/lib/search_utils.py @@ -3,6 +3,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union, cast from urllib.parse import parse_qs, urlencode +from asgiref.sync import sync_to_async from django.conf import settings from django.core.cache import cache, caches from django.http import HttpRequest, QueryDict @@ -1283,7 +1284,7 @@ def get_mlt_query( return si.mlt_query(hl_fields).add_extra(**q) -def clean_up_recap_document_file(item: RECAPDocument) -> None: +async def clean_up_recap_document_file(item: RECAPDocument) -> None: """Clean up the RecapDocument file-related fields after detecting the file doesn't exist in the storage. @@ -1292,10 +1293,10 @@ def clean_up_recap_document_file(item: RECAPDocument) -> None: """ if type(item) == RECAPDocument: - item.filepath_local.delete() + await sync_to_async(item.filepath_local.delete)() item.sha1 = "" item.date_upload = None item.file_size = None item.page_count = None item.is_available = False - item.save() + await item.asave() diff --git a/cl/lib/thumbnails.py b/cl/lib/thumbnails.py index 3647af2db0..9631abbb7f 100644 --- a/cl/lib/thumbnails.py +++ b/cl/lib/thumbnails.py @@ -1,5 +1,6 @@ from typing import Any +from asgiref.sync import async_to_sync from django.core.files.base import ContentFile from cl.lib.microservice_utils import microservice @@ -21,7 +22,7 @@ def make_png_thumbnail_for_instance( :param max_dimension: The longest you want any edge to be """ item = klass.objects.get(pk=pk) - response = microservice( + response = async_to_sync(microservice)( service="generate-thumbnail", item=item, params={"max_dimension": max_dimension}, diff --git a/cl/recap/mergers.py b/cl/recap/mergers.py index 7df85da1e5..51ac7f0a63 100644 --- a/cl/recap/mergers.py +++ b/cl/recap/mergers.py @@ -5,6 +5,7 @@ from datetime import date, timedelta from typing import Any, Dict, List, Optional, Tuple, Union +from asgiref.sync import async_to_sync from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.db import IntegrityError, OperationalError, transaction @@ -80,7 +81,7 @@ def confirm_docket_number_core_lookup_match( return docket -def find_docket_object( +async def find_docket_object( court_id: str, pacer_case_id: str | None, docket_number: str, @@ -134,11 +135,11 @@ def find_docket_object( for kwargs in lookups: ds = Docket.objects.filter(court_id=court_id, **kwargs).using(using) - count = ds.count() + count = await ds.acount() if count == 0: continue # Try a looser lookup. if count == 1: - d = ds[0] + d = await ds.afirst() if kwargs.get("pacer_case_id") is None and kwargs.get( "docket_number_core" ): @@ -147,7 +148,7 @@ def find_docket_object( break # Nailed it! elif count > 1: # Choose the oldest one and live with it. - d = ds.earliest("date_created") + d = await ds.aearliest("date_created") if kwargs.get("pacer_case_id") is None and kwargs.get( "docket_number_core" ): @@ -164,7 +165,7 @@ def find_docket_object( if using != "default": # Get the item from the default DB - d = Docket.objects.get(pk=d.pk) + d = await Docket.objects.aget(pk=d.pk) return d @@ -1262,7 +1263,7 @@ def get_data_from_appellate_att_report( return att_data -def add_tags_to_objs(tag_names: List[str], objs: Any) -> QuerySet: +async def add_tags_to_objs(tag_names: List[str], objs: Any) -> QuerySet: """Add tags by name to objects :param tag_names: A list of tag name strings @@ -1276,7 +1277,7 @@ def add_tags_to_objs(tag_names: List[str], objs: Any) -> QuerySet: tags = [] for tag_name in tag_names: - tag, _ = Tag.objects.get_or_create(name=tag_name) + tag, _ = await Tag.objects.aget_or_create(name=tag_name) tags.append(tag) for tag in tags: @@ -1306,7 +1307,7 @@ def merge_pacer_docket_into_cl_docket( og_info.save() d.originating_court_information = og_info - tags = add_tags_to_objs(tag_names, [d]) + tags = async_to_sync(add_tags_to_objs)(tag_names, [d]) # Add the HTML to the docket in case we need it someday. upload_type = ( @@ -1526,7 +1527,7 @@ def process_orphan_documents( try: from cl.recap.tasks import process_recap_pdf - process_recap_pdf(pq) + async_to_sync(process_recap_pdf)(pq) except: # We can ignore this. If we don't, we get all of the # exceptions that were previously raised for the diff --git a/cl/recap/tasks.py b/cl/recap/tasks.py index f882eca460..fa466fc312 100644 --- a/cl/recap/tasks.py +++ b/cl/recap/tasks.py @@ -1,3 +1,4 @@ +import asyncio import hashlib import logging from dataclasses import dataclass @@ -6,6 +7,7 @@ from zipfile import ZipFile import requests +from asgiref.sync import async_to_sync, sync_to_async from botocore import exceptions as botocore_exception from celery import Task from celery.canvas import chain @@ -96,47 +98,42 @@ cnt = CaseNameTweaker() -def process_recap_upload(pq: ProcessingQueue) -> None: +async def process_recap_upload(pq: ProcessingQueue) -> None: """Process an item uploaded from an extension or API user. Uploaded objects can take a variety of forms, and we'll need to process them accordingly. """ if pq.upload_type == UPLOAD_TYPE.DOCKET: - chain( - process_recap_docket.s(pq.pk), add_or_update_recap_docket.s() - ).apply_async() + docket = await process_recap_docket(pq.pk) + await sync_to_async(add_or_update_recap_docket)(docket) elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE: - process_recap_attachment.delay(pq.pk) + await process_recap_attachment(pq.pk) elif pq.upload_type == UPLOAD_TYPE.PDF: - process_recap_pdf.delay(pq.pk) + await process_recap_pdf(pq.pk) elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT: - chain( - process_recap_docket_history_report.s(pq.pk), - add_or_update_recap_docket.s(), - ).apply_async() + docket = await process_recap_docket_history_report(pq.pk) + await sync_to_async(add_or_update_recap_docket)(docket) elif pq.upload_type == UPLOAD_TYPE.APPELLATE_DOCKET: - chain( - process_recap_appellate_docket.s(pq.pk), - add_or_update_recap_docket.s(), - ).apply_async() + docket = await process_recap_appellate_docket(pq.pk) + await sync_to_async(add_or_update_recap_docket)(docket) elif pq.upload_type == UPLOAD_TYPE.APPELLATE_ATTACHMENT_PAGE: - process_recap_appellate_attachment.delay(pq.pk) + await process_recap_appellate_attachment(pq.pk) elif pq.upload_type == UPLOAD_TYPE.CLAIMS_REGISTER: - process_recap_claims_register.delay(pq.pk) + await process_recap_claims_register(pq.pk) elif pq.upload_type == UPLOAD_TYPE.DOCUMENT_ZIP: - process_recap_zip.delay(pq.pk) + await process_recap_zip(pq.pk) elif pq.upload_type == UPLOAD_TYPE.CASE_QUERY_PAGE: - chain( - process_case_query_page.s(pq.pk), - add_or_update_recap_docket.s(), - ).apply_async() + docket = await process_case_query_page(pq.pk) + await sync_to_async(add_or_update_recap_docket)(docket) elif pq.upload_type == UPLOAD_TYPE.APPELLATE_CASE_QUERY_PAGE: - process_recap_appellate_case_query_page.delay(pq.pk) + await sync_to_async(process_recap_appellate_case_query_page)(pq.pk) elif pq.upload_type == UPLOAD_TYPE.CASE_QUERY_RESULT_PAGE: - process_recap_case_query_result_page.delay(pq.pk) + await sync_to_async(process_recap_case_query_result_page)(pq.pk) elif pq.upload_type == UPLOAD_TYPE.APPELLATE_CASE_QUERY_RESULT_PAGE: - process_recap_appellate_case_query_result_page.delay(pq.pk) + await sync_to_async(process_recap_appellate_case_query_result_page)( + pq.pk + ) def do_pacer_fetch(fq: PacerFetchQueue): @@ -168,7 +165,7 @@ def do_pacer_fetch(fq: PacerFetchQueue): return result -def mark_pq_successful(pq, d_id=None, de_id=None, rd_id=None): +async def mark_pq_successful(pq, d_id=None, de_id=None, rd_id=None): """Mark the processing queue item as successfully completed. :param pq: The ProcessingQueue object to manipulate @@ -180,7 +177,7 @@ def mark_pq_successful(pq, d_id=None, de_id=None, rd_id=None): applies to document uploads (obviously). """ # Ditch the original file - pq.filepath_local.delete(save=False) + await sync_to_async(pq.filepath_local.delete)(save=False) if pq.debug: pq.error_message = "Successful debugging upload! Nice work." else: @@ -189,11 +186,13 @@ def mark_pq_successful(pq, d_id=None, de_id=None, rd_id=None): pq.docket_id = d_id pq.docket_entry_id = de_id pq.recap_document_id = rd_id - pq.save() + await pq.asave() return pq.status, pq.error_message -def mark_pq_status(pq, msg, status, message_property_name="error_message"): +async def mark_pq_status( + pq, msg, status, message_property_name="error_message" +): """Mark the processing queue item as some process, and log the message. :param pq: The ProcessingQueue object to manipulate @@ -205,26 +204,19 @@ def mark_pq_status(pq, msg, status, message_property_name="error_message"): logger.info(msg) setattr(pq, message_property_name, msg) pq.status = status - pq.save() + await pq.asave() return pq.status, getattr(pq, message_property_name) -@app.task( - bind=True, - autoretry_for=(requests.ConnectionError, requests.ReadTimeout), - max_retries=5, - interval_start=5 * 60, - interval_step=10 * 60, -) -def process_recap_pdf(self, pk): +async def process_recap_pdf(pk): """Process an uploaded PDF from the RECAP API endpoint. :param pk: The PK of the processing queue item you want to work on. :return: A RECAPDocument object that was created or updated. """ """Save a RECAP PDF to the database.""" - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) if pq.attachment_number is None: document_type = RECAPDocument.PACER_DOCUMENT @@ -234,81 +226,94 @@ def process_recap_pdf(self, pk): logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq} ") try: if pq.pacer_case_id: - rd = RECAPDocument.objects.get( + rd = await RECAPDocument.objects.aget( docket_entry__docket__pacer_case_id=pq.pacer_case_id, pacer_doc_id=pq.pacer_doc_id, ) else: # Sometimes we don't have the case ID from PACER. Try to make this # work anyway. - rd = RECAPDocument.objects.get(pacer_doc_id=pq.pacer_doc_id) + rd = await RECAPDocument.objects.aget(pacer_doc_id=pq.pacer_doc_id) except (RECAPDocument.DoesNotExist, RECAPDocument.MultipleObjectsReturned): - try: - d = Docket.objects.get( - pacer_case_id=pq.pacer_case_id, court_id=pq.court_id - ) - except Docket.DoesNotExist as exc: - # No Docket and no RECAPDocument. Do a retry. Hopefully - # the docket will be in place soon (it could be in a - # different upload task that hasn't yet been processed). - logger.warning( - "Unable to find docket for processing queue '%s'. " - "Retrying if max_retries is not exceeded." % pq - ) - error_message = "Unable to find docket for item." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) + retries = 5 + while True: + try: + d = await Docket.objects.aget( + pacer_case_id=pq.pacer_case_id, court_id=pq.court_id + ) + except Docket.DoesNotExist as exc: + # No Docket and no RECAPDocument. Do a retry. Hopefully + # the docket will be in place soon (it could be in a + # different upload task that hasn't yet been processed). + logger.warning( + "Unable to find docket for processing queue '%s'. " + "Retrying if max_retries is not exceeded." % pq + ) + error_message = "Unable to find docket for item." + if retries > 0: + retries -= 1 + await mark_pq_status( + pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY + ) + await asyncio.sleep(1) + continue + await mark_pq_status( + pq, error_message, PROCESSING_STATUS.FAILED + ) + raise exc + except Docket.MultipleObjectsReturned: + msg = f"Too many dockets found when trying to save '{pq}'" + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None else: - mark_pq_status( - pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY - ) - raise self.retry(exc=exc) - except Docket.MultipleObjectsReturned: - msg = f"Too many dockets found when trying to save '{pq}'" - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None + break # Got the Docket, attempt to get/create the DocketEntry, and then # create the RECAPDocument - try: - de = DocketEntry.objects.get( - docket=d, entry_number=pq.document_number - ) - except DocketEntry.DoesNotExist as exc: - logger.warning( - f"Unable to find docket entry for processing queue '{pq}'." - ) - msg = "Unable to find docket entry for item." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) - else: - # If we're here, we've got the docket and docket - # entry, but were unable to find the document by - # pacer_doc_id. This happens when pacer_doc_id is - # missing, for example. ∴, try to get the document - # from the docket entry. + retries = 5 + while True: try: - rd = RECAPDocument.objects.get( - docket_entry=de, - document_number=pq.document_number, - attachment_number=pq.attachment_number, - document_type=document_type, + de = await DocketEntry.objects.aget( + docket=d, entry_number=pq.document_number ) - except ( - RECAPDocument.DoesNotExist, - RECAPDocument.MultipleObjectsReturned, - ): - # Unable to find it. Make a new item. - rd = RECAPDocument( - docket_entry=de, - pacer_doc_id=pq.pacer_doc_id, - document_type=document_type, + except DocketEntry.DoesNotExist as exc: + logger.warning( + f"Unable to find docket entry for processing queue '{pq}'." ) + msg = "Unable to find docket entry for item." + if retries > 0: + retries -= 1 + await mark_pq_status( + pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY + ) + await asyncio.sleep(1) + continue + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + raise exc + else: + # If we're here, we've got the docket and docket + # entry, but were unable to find the document by + # pacer_doc_id. This happens when pacer_doc_id is + # missing, for example. ∴, try to get the document + # from the docket entry. + try: + rd = await RECAPDocument.objects.aget( + docket_entry=de, + document_number=pq.document_number, + attachment_number=pq.attachment_number, + document_type=document_type, + ) + except ( + RECAPDocument.DoesNotExist, + RECAPDocument.MultipleObjectsReturned, + ): + # Unable to find it. Make a new item. + rd = await RECAPDocument.objects.acreate( + docket_entry=de, + pacer_doc_id=pq.pacer_doc_id, + document_type=document_type, + ) + break rd.document_number = pq.document_number rd.attachment_number = pq.attachment_number @@ -319,12 +324,8 @@ def process_recap_pdf(self, pk): new_sha1 = hashlib.file_digest(f, "sha1").hexdigest() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None existing_document = all( [ @@ -336,22 +337,26 @@ def process_recap_pdf(self, pk): if not existing_document: # Different sha1, it wasn't available, or it's missing from disk. Move # the new file over from the processing queue storage. + docket_entry = await DocketEntry.objects.aget(id=rd.docket_entry_id) + docket = await Docket.objects.aget(id=docket_entry.docket_id) file_name = get_document_filename( - rd.docket_entry.docket.court_id, - rd.docket_entry.docket.pacer_case_id, + docket.court_id, + docket.pacer_case_id, rd.document_number, rd.attachment_number, ) if not pq.debug: with pq.filepath_local.open("rb") as f: - rd.filepath_local.save(file_name, File(f), save=False) + await sync_to_async(rd.filepath_local.save)( + file_name, File(f), save=False + ) # Do page count and extraction - response = microservice( + response = await microservice( service="page-count", item=rd, ) - if response.ok: + if response.is_success: rd.page_count = response.text rd.file_size = rd.filepath_local.size @@ -362,29 +367,29 @@ def process_recap_pdf(self, pk): if not pq.debug: try: - rd.save() + await rd.asave() except (IntegrityError, ValidationError): msg = "Duplicate key on unique_together constraint" - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) rd.filepath_local.delete(save=False) return None if not existing_document and not pq.debug: - extract_recap_pdf_base(rd.pk), - add_items_to_solr([rd.pk], "search.RECAPDocument") + await extract_recap_pdf_base(rd.pk), + await sync_to_async(add_items_to_solr)([rd.pk], "search.RECAPDocument") - mark_pq_successful( + await mark_pq_successful( pq, d_id=rd.docket_entry.docket_id, de_id=rd.docket_entry_id, rd_id=rd.pk, ) - mark_ia_upload_needed(rd.docket_entry.docket, save_docket=True) + docket = await Docket.objects.aget(id=rd.docket_entry.docket_id) + await sync_to_async(mark_ia_upload_needed)(docket, save_docket=True) return rd -@app.task(bind=True, ignore_result=True) -def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: +async def process_recap_zip(pk: int) -> dict[str, list[int] | list[Task]]: """Process a zip uploaded from a PACER district court The general process is to use our existing infrastructure. We open the zip, @@ -396,8 +401,8 @@ def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: :return: A list of new PQ's that were created, one per PDF that was enqueued. """ - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info("Processing RECAP zip (debug is: %s): %s", pq.debug, pq) with pq.filepath_local.open("rb") as zip_bytes: @@ -407,7 +412,7 @@ def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: for zip_info in archive.infolist(): if zip_info.file_size < max_file_size: continue - mark_pq_status( + await mark_pq_status( pq, "Zip too large; possible zip bomb. File in zip named %s " "would be %s bytes expanded." @@ -440,9 +445,9 @@ def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: pacer_doc_id = pq.pacer_doc_id # Create a new PQ and enqueue it for processing - new_pq = ProcessingQueue.objects.create( - court=pq.court, - uploader=pq.uploader, + new_pq = await ProcessingQueue.objects.acreate( + court_id=pq.court_id, + uploader_id=pq.uploader_id, pacer_case_id=pq.pacer_case_id, pacer_doc_id=pacer_doc_id, document_number=doc_num, @@ -453,10 +458,10 @@ def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: debug=pq.debug, ) new_pqs.append(new_pq.pk) - tasks.append(process_recap_pdf.delay(new_pq.pk)) + await process_recap_pdf(new_pq.pk) # At the end, mark the pq as successful and return the PQ - mark_pq_status( + await mark_pq_status( pq, f"Successfully created ProcessingQueue objects: {oxford_join(new_pqs)}", PROCESSING_STATUS.SUCCESSFUL, @@ -470,15 +475,7 @@ def process_recap_zip(self, pk: int) -> dict[str, list[int] | list[Task]]: } -@app.task( - bind=True, - autoretry_for=(requests.ConnectionError, requests.ReadTimeout), - max_retries=5, - interval_start=5 * 60, - interval_step=5 * 60, - ignore_result=True, -) -def process_recap_docket(self, pk): +async def process_recap_docket(pk): """Process an uploaded docket from the RECAP API endpoint. :param pk: The primary key of the processing queue item you want to work @@ -498,8 +495,8 @@ def process_recap_docket(self, pk): """ start_time = now() - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") report = DocketReport(map_cl_to_pacer_id(pq.court_id)) @@ -508,21 +505,7 @@ def process_recap_docket(self, pk): text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) - - if "History/Documents" in text: - # Prior to 1.1.8, we did not separate docket history reports into their - # own upload_type. Alas, we still have some old clients around, so we - # need to handle those clients here. - pq.upload_type = UPLOAD_TYPE.DOCKET_HISTORY_REPORT - pq.save() - process_recap_docket_history_report(pk) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None report._parse_text(text) @@ -532,57 +515,53 @@ def process_recap_docket(self, pk): if data == {}: # Not really a docket. Some sort of invalid document (see Juriscraper). msg = "Not a valid docket upload." - mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) return None # Merge the contents of the docket into CL. - d = find_docket_object( + d = await find_docket_object( pq.court_id, pq.pacer_case_id, data["docket_number"] ) d.add_recap_source() - update_docket_metadata(d, data) + await sync_to_async(update_docket_metadata)(d, data) if not d.pacer_case_id: d.pacer_case_id = pq.pacer_case_id if pq.debug: - mark_pq_successful(pq, d_id=d.pk) - self.request.chain = None + await mark_pq_successful(pq, d_id=d.pk) return {"docket_pk": d.pk, "content_updated": False} - d.save() + await d.asave() # Add the HTML to the docket in case we need it someday. - pacer_file = PacerHtmlFiles( + pacer_file = await PacerHtmlFiles.objects.acreate( content_object=d, upload_type=UPLOAD_TYPE.DOCKET ) - pacer_file.filepath.save( + await sync_to_async(pacer_file.filepath.save)( "docket.html", # We only care about the ext w/S3PrivateUUIDStorageTest ContentFile(text.encode()), ) - des_returned, rds_created, content_updated = add_docket_entries( - d, data["docket_entries"] + des_returned, rds_created, content_updated = await sync_to_async( + add_docket_entries + )(d, data["docket_entries"]) + await sync_to_async(add_parties_and_attorneys)(d, data["parties"]) + await sync_to_async(process_orphan_documents)( + rds_created, pq.court_id, d.date_filed ) - add_parties_and_attorneys(d, data["parties"]) - process_orphan_documents(rds_created, pq.court_id, d.date_filed) if content_updated: newly_enqueued = enqueue_docket_alert(d.pk) if newly_enqueued: - send_alert_and_webhook(d.pk, start_time) - mark_pq_successful(pq, d_id=d.pk) + await sync_to_async(send_alert_and_webhook)(d.pk, start_time) + await mark_pq_successful(pq, d_id=d.pk) return { "docket_pk": d.pk, "content_updated": bool(rds_created or content_updated), } -@app.task( - bind=True, max_retries=3, interval_start=5 * 60, interval_step=5 * 60 -) -def process_recap_attachment( - self: Task, +async def process_recap_attachment( pk: int, tag_names: Optional[List[str]] = None, document_number: int | None = None, @@ -599,20 +578,18 @@ def process_recap_attachment( message """ - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") try: text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return pq_status, msg, [] - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) + return pq_status, msg, [] att_data = get_data_from_att_report(text, pq.court_id) logger.info(f"Parsing completed for item {pq}") @@ -620,8 +597,7 @@ def process_recap_attachment( if att_data == {}: # Bad attachment page. msg = "Not a valid attachment page upload." - self.request.chain = None - pq_status, msg = mark_pq_status( + pq_status, msg = await mark_pq_status( pq, msg, PROCESSING_STATUS.INVALID_CONTENT ) return pq_status, msg, [] @@ -629,13 +605,14 @@ def process_recap_attachment( if pq.pacer_case_id in ["undefined", "null"]: # Bad data from the client. Fix it with parsed data. pq.pacer_case_id = att_data.get("pacer_case_id") - pq.save() + await pq.asave() if document_number is None: document_number = att_data["document_number"] try: - rds_affected, de = merge_attachment_page_data( - pq.court, + court = await Court.objects.aget(id=pq.court_id) + rds_affected, de = await sync_to_async(merge_attachment_page_data)( + court, pq.pacer_case_id, att_data["pacer_doc_id"], document_number, @@ -648,26 +625,25 @@ def process_recap_attachment( "Too many documents found when attempting to associate " "attachment data" ) - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) return pq_status, msg, [] except RECAPDocument.DoesNotExist as exc: msg = "Could not find docket to associate with attachment metadata" - if (self.request.retries == self.max_retries) or pq.debug: - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return pq_status, msg, [] - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) + raise exc - add_tags_to_objs(tag_names, rds_affected) - pq_status, msg = mark_pq_successful(pq, d_id=de.docket_id, de_id=de.pk) + await add_tags_to_objs(tag_names, rds_affected) + pq_status, msg = await mark_pq_successful( + pq, d_id=de.docket_id, de_id=de.pk + ) return pq_status, msg, rds_affected -@app.task( - bind=True, max_retries=3, interval_start=5 * 60, interval_step=5 * 60 -) -def process_recap_claims_register(self, pk): +async def process_recap_claims_register(pk): """Merge bankruptcy claims registry HTML into RECAP :param pk: The primary key of the processing queue item you want to work on @@ -675,26 +651,21 @@ def process_recap_claims_register(self, pk): :return: None :rtype: None """ - pq = ProcessingQueue.objects.get(pk=pk) + pq = await ProcessingQueue.objects.aget(pk=pk) if pq.debug: # Proper debugging not supported on this endpoint. Just abort. - mark_pq_successful(pq) - self.request.chain = None + await mark_pq_successful(pq) return None - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") try: text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None report = ClaimsRegister(map_cl_to_pacer_id(pq.court_id)) report._parse_text(text) @@ -704,78 +675,63 @@ def process_recap_claims_register(self, pk): if not data: # Bad HTML msg = "Not a valid claims registry page or other parsing failure" - mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) return None # Merge the contents of the docket into CL. - d = find_docket_object( + d = await find_docket_object( pq.court_id, pq.pacer_case_id, data["docket_number"] ) # Merge the contents into CL d.add_recap_source() - update_docket_metadata(d, data) + await sync_to_async(update_docket_metadata)(d, data) try: - d.save() + await d.asave() except IntegrityError as exc: logger.warning( "Race condition experienced while attempting docket save." ) error_message = "Unable to save docket due to IntegrityError." - if self.request.retries == self.max_retries: - mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) - self.request.chain = None - return None - else: - mark_pq_status( - pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY - ) - raise self.retry(exc=exc) + await mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) + return None - add_bankruptcy_data_to_docket(d, data) - add_claims_to_docket(d, data["claims"]) + await sync_to_async(add_bankruptcy_data_to_docket)(d, data) + await sync_to_async(add_claims_to_docket)(d, data["claims"]) logger.info("Created/updated claims data for %s", pq) # Add the HTML to the docket in case we need it someday. - pacer_file = PacerHtmlFiles( + pacer_file = await PacerHtmlFiles.objects.acreate( content_object=d, upload_type=UPLOAD_TYPE.CLAIMS_REGISTER ) - pacer_file.filepath.save( + await sync_to_async(pacer_file.filepath.save)( # We only care about the ext w/S3PrivateUUIDStorageTest "claims_registry.html", ContentFile(text.encode()), ) - mark_pq_successful(pq, d_id=d.pk) + await mark_pq_successful(pq, d_id=d.pk) return {"docket_pk": d.pk} -@app.task( - bind=True, max_retries=3, interval_start=5 * 60, interval_step=5 * 60 -) -def process_recap_docket_history_report(self, pk): +async def process_recap_docket_history_report(pk): """Process the docket history report. :param pk: The primary key of the processing queue item you want to work on :returns: A dict indicating whether the docket needs Solr re-indexing. """ start_time = now() - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") try: text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None report = DocketHistoryReport(map_cl_to_pacer_id(pq.court_id)) report._parse_text(text) @@ -785,45 +741,36 @@ def process_recap_docket_history_report(self, pk): if data == {}: # Bad docket history page. msg = "Not a valid docket history page upload." - mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) return None # Merge the contents of the docket into CL. - d = find_docket_object( + d = await find_docket_object( pq.court_id, pq.pacer_case_id, data["docket_number"] ) d.add_recap_source() - update_docket_metadata(d, data) + await sync_to_async(update_docket_metadata)(d, data) if pq.debug: - mark_pq_successful(pq, d_id=d.pk) - self.request.chain = None + await mark_pq_successful(pq, d_id=d.pk) return {"docket_pk": d.pk, "content_updated": False} try: - d.save() + await d.asave() except IntegrityError as exc: logger.warning( "Race condition experienced while attempting docket save." ) error_message = "Unable to save docket due to IntegrityError." - if self.request.retries == self.max_retries: - mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) - self.request.chain = None - return None - else: - mark_pq_status( - pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY - ) - raise self.retry(exc=exc) + await mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) + return None # Add the HTML to the docket in case we need it someday. - pacer_file = PacerHtmlFiles( + pacer_file = await PacerHtmlFiles.objects.acreate( content_object=d, upload_type=UPLOAD_TYPE.DOCKET_HISTORY_REPORT ) - pacer_file.filepath.save( + await sync_to_async(pacer_file.filepath.save)( # We only care about the ext w/S3PrivateUUIDStorageTest "docket_history.html", ContentFile(text.encode()), @@ -832,42 +779,37 @@ def process_recap_docket_history_report(self, pk): des_returned, rds_created, content_updated = add_docket_entries( d, data["docket_entries"] ) - process_orphan_documents(rds_created, pq.court_id, d.date_filed) + await sync_to_async(process_orphan_documents)( + rds_created, pq.court_id, d.date_filed + ) if content_updated: newly_enqueued = enqueue_docket_alert(d.pk) if newly_enqueued: - send_alert_and_webhook(d.pk, start_time) - mark_pq_successful(pq, d_id=d.pk) + await sync_to_async(send_alert_and_webhook)(d.pk, start_time) + await mark_pq_successful(pq, d_id=d.pk) return { "docket_pk": d.pk, "content_updated": bool(rds_created or content_updated), } -@app.task( - bind=True, max_retries=3, interval_start=5 * 60, interval_step=5 * 60 -) -def process_case_query_page(self, pk): +async def process_case_query_page(pk): """Process the case query (iquery.pl) page. :param pk: The primary key of the processing queue item you want to work on :returns: A dict indicating whether the docket needs Solr re-indexing. """ - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") try: text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None report = CaseQuery(map_cl_to_pacer_id(pq.court_id)) report._parse_text(text) @@ -877,17 +819,16 @@ def process_case_query_page(self, pk): if data == {}: # Bad docket iquery page. msg = "Not a valid case query page upload." - mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) return None # Merge the contents of the docket into CL. - d = find_docket_object( + d = await find_docket_object( pq.court_id, pq.pacer_case_id, data["docket_number"] ) current_case_name = d.case_name d.add_recap_source() - update_docket_metadata(d, data) + await sync_to_async(update_docket_metadata)(d, data) # Update the docket in SOLR if the case name has changed and contains # docket entries @@ -897,47 +838,49 @@ def process_case_query_page(self, pk): content_updated = True if pq.debug: - mark_pq_successful(pq, d_id=d.pk) - self.request.chain = None + await mark_pq_successful(pq, d_id=d.pk) return {"docket_pk": d.pk, "content_updated": False} - try: - d.save() - add_bankruptcy_data_to_docket(d, data) - except IntegrityError as exc: - logger.warning( - "Race condition experienced while attempting docket save." - ) - error_message = "Unable to save docket due to IntegrityError." - if self.request.retries == self.max_retries: - mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) - self.request.chain = None + retries = 5 + while True: + try: + await d.asave() + await sync_to_async(add_bankruptcy_data_to_docket)(d, data) + except IntegrityError as exc: + logger.warning( + "Race condition experienced while attempting docket save." + ) + error_message = "Unable to save docket due to IntegrityError." + if retries > 0: + retries -= 1 + await mark_pq_status( + pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY + ) + await asyncio.sleep(1) + continue + await mark_pq_status(pq, error_message, PROCESSING_STATUS.FAILED) return None else: - mark_pq_status( - pq, error_message, PROCESSING_STATUS.QUEUED_FOR_RETRY - ) - raise self.retry(exc=exc) + break # Add the HTML to the docket in case we need it someday. - pacer_file = PacerHtmlFiles( + pacer_file = await PacerHtmlFiles.objects.acreate( content_object=d, upload_type=UPLOAD_TYPE.CASE_QUERY_PAGE ) - pacer_file.filepath.save( + await sync_to_async(pacer_file.filepath.save)( # We only care about the ext w/S3PrivateUUIDStorageTest "case_report.html", ContentFile(text.encode()), ) - mark_pq_successful(pq, d_id=d.pk) + await mark_pq_successful(pq, d_id=d.pk) return { "docket_pk": d.pk, "content_updated": content_updated, } -@app.task(bind=True, max_retries=3, ignore_result=True) -def process_recap_appellate_docket(self, pk): +async def process_recap_appellate_docket(pk): """Process an uploaded appellate docket from the RECAP API endpoint. :param pk: The primary key of the processing queue item you want to work @@ -957,8 +900,8 @@ def process_recap_appellate_docket(self, pk): """ start_time = now() - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info( f"Processing Appellate RECAP item (debug is: {pq.debug}): {pq}" ) @@ -969,12 +912,8 @@ def process_recap_appellate_docket(self, pk): text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return None - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None report._parse_text(text) data = report.data @@ -983,59 +922,58 @@ def process_recap_appellate_docket(self, pk): if data == {}: # Not really a docket. Some sort of invalid document (see Juriscraper). msg = "Not a valid docket upload." - mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) - self.request.chain = None + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) return None # Merge the contents of the docket into CL. - d = find_docket_object( + d = await find_docket_object( pq.court_id, pq.pacer_case_id, data["docket_number"] ) d.add_recap_source() - update_docket_metadata(d, data) - d, og_info = update_docket_appellate_metadata(d, data) + await sync_to_async(update_docket_metadata)(d, data) + d, og_info = await sync_to_async(update_docket_appellate_metadata)(d, data) if not d.pacer_case_id: d.pacer_case_id = pq.pacer_case_id if pq.debug: - mark_pq_successful(pq, d_id=d.pk) - self.request.chain = None + await mark_pq_successful(pq, d_id=d.pk) return {"docket_pk": d.pk, "content_updated": False} if og_info is not None: - og_info.save() + await og_info.asave() d.originating_court_information = og_info - d.save() + await d.asave() # Add the HTML to the docket in case we need it someday. - pacer_file = PacerHtmlFiles( + pacer_file = await PacerHtmlFiles.objects.acreate( content_object=d, upload_type=UPLOAD_TYPE.APPELLATE_DOCKET ) - pacer_file.filepath.save( + await sync_to_async(pacer_file.filepath.save)( "docket.html", # We only care about the ext w/S3PrivateUUIDStorageTest ContentFile(text.encode()), ) - des_returned, rds_created, content_updated = add_docket_entries( - d, data["docket_entries"] + des_returned, rds_created, content_updated = await sync_to_async( + add_docket_entries + )(d, data["docket_entries"]) + await sync_to_async(add_parties_and_attorneys)(d, data["parties"]) + await sync_to_async(process_orphan_documents)( + rds_created, pq.court_id, d.date_filed ) - add_parties_and_attorneys(d, data["parties"]) - process_orphan_documents(rds_created, pq.court_id, d.date_filed) if content_updated: newly_enqueued = enqueue_docket_alert(d.pk) if newly_enqueued: - send_alert_and_webhook(d.pk, start_time) - mark_pq_successful(pq, d_id=d.pk) + await sync_to_async(send_alert_and_webhook)(d.pk, start_time) + await mark_pq_successful(pq, d_id=d.pk) return { "docket_pk": d.pk, "content_updated": bool(rds_created or content_updated), } -@app.task(bind=True) -def process_recap_appellate_attachment( - self: Task, pk: int +async def process_recap_appellate_attachment( + pk: int, ) -> Optional[Tuple[int, str, list[RECAPDocument]]]: """Process an uploaded appellate attachment page. @@ -1045,20 +983,18 @@ def process_recap_appellate_attachment( message and the recap documents affected. """ - pq = ProcessingQueue.objects.get(pk=pk) - mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") try: text = pq.filepath_local.read().decode() except IOError as exc: msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - if (self.request.retries == self.max_retries) or pq.debug: - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return pq_status, msg, [] - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) + return pq_status, msg, [] att_data = get_data_from_appellate_att_report(text, pq.court_id) logger.info(f"Parsing completed for item {pq}") @@ -1066,8 +1002,7 @@ def process_recap_appellate_attachment( if att_data == {}: # Bad attachment page. msg = "Not a valid appellate attachment page upload." - self.request.chain = None - pq_status, msg = mark_pq_status( + pq_status, msg = await mark_pq_status( pq, msg, PROCESSING_STATUS.INVALID_CONTENT ) return pq_status, msg, [] @@ -1075,11 +1010,12 @@ def process_recap_appellate_attachment( if pq.pacer_case_id in ["undefined", "null"]: # Bad data from the client. Fix it with parsed data. pq.pacer_case_id = att_data.get("pacer_case_id") - pq.save() + await pq.asave() try: - rds_affected, de = merge_attachment_page_data( - pq.court, + court = await Court.objects.aget(id=pq.court_id) + rds_affected, de = await sync_to_async(merge_attachment_page_data)( + court, pq.pacer_case_id, att_data["pacer_doc_id"], None, # Appellate attachments don't contain a document_number @@ -1092,18 +1028,20 @@ def process_recap_appellate_attachment( "Too many documents found when attempting to associate " "attachment data" ) - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) return pq_status, msg, [] except RECAPDocument.DoesNotExist as exc: msg = "Could not find docket to associate with attachment metadata" - if (self.request.retries == self.max_retries) or pq.debug: - pq_status, msg = mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) - return pq_status, msg, [] - else: - mark_pq_status(pq, msg, PROCESSING_STATUS.QUEUED_FOR_RETRY) - raise self.retry(exc=exc) + pq_status, msg = await mark_pq_status( + pq, msg, PROCESSING_STATUS.FAILED + ) + return pq_status, msg, [] - pq_status, msg = mark_pq_successful(pq, d_id=de.docket_id, de_id=de.pk) + pq_status, msg = await mark_pq_successful( + pq, d_id=de.docket_id, de_id=de.pk + ) return pq_status, msg, rds_affected @@ -1116,7 +1054,7 @@ def process_recap_appellate_case_query_page(self, pk): """ pq = ProcessingQueue.objects.get(pk=pk) msg = "Appellate case query pages not yet supported. Coming soon." - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + async_to_sync(mark_pq_status)(pq, msg, PROCESSING_STATUS.FAILED) return None @@ -1129,7 +1067,7 @@ def process_recap_case_query_result_page(self, pk): """ pq = ProcessingQueue.objects.get(pk=pk) msg = "Case query result pages not yet supported. Coming soon." - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + async_to_sync(mark_pq_status)(pq, msg, PROCESSING_STATUS.FAILED) return None @@ -1142,7 +1080,7 @@ def process_recap_appellate_case_query_result_page(self, pk): """ pq = ProcessingQueue.objects.get(pk=pk) msg = "Appellate case query result pages not yet supported. Coming soon." - mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + async_to_sync(mark_pq_status)(pq, msg, PROCESSING_STATUS.FAILED) return None @@ -1594,7 +1532,7 @@ def fetch_docket_by_pacer_case_id(session, court_id, pacer_case_id, fq): if fq.docket_id: d = Docket.objects.get(pk=fq.docket_id) else: - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_id, pacer_case_id, docket_data["docket_number"] ) rds_created, content_updated = merge_pacer_docket_into_cl_docket( @@ -1636,7 +1574,7 @@ def fetch_docket(self, fq_pk): return None raise self.retry() - mark_pq_status(fq, "", PROCESSING_STATUS.IN_PROGRESS) + async_to_sync(mark_pq_status)(fq, "", PROCESSING_STATUS.IN_PROGRESS) cookies = get_pacer_cookie_from_cache(fq.user_id) if cookies is None: @@ -1956,7 +1894,7 @@ def download_pacer_pdf_and_save_to_pq( if not magic_number: r_msg = "No magic number available to download the document." if created: - mark_pq_status( + async_to_sync(mark_pq_status)( pq, r_msg, PROCESSING_STATUS.FAILED, "error_message" ) # Return an existing PQ object after retry or for multi-docket NEFs. @@ -2012,7 +1950,7 @@ def get_and_copy_recap_attachment_docs( # as successful and delete its filepath_local for pq in unique_pqs: if pq.status != PROCESSING_STATUS.FAILED: - mark_pq_successful(pq) + async_to_sync(mark_pq_successful)(pq) @dataclass @@ -2049,7 +1987,7 @@ def open_and_validate_email_notification( except FileNotFoundError as exc: if self.request.retries == self.max_retries: msg = "File not found." - mark_pq_status( + async_to_sync(mark_pq_status)( epq, msg, PROCESSING_STATUS.FAILED, "status_message" ) return None, "" @@ -2067,7 +2005,7 @@ def open_and_validate_email_notification( or data["dockets"][0]["docket_entries"][0]["pacer_case_id"] is None ): msg = "Not a valid notification email. No message content." - mark_pq_status( + async_to_sync(mark_pq_status)( epq, msg, PROCESSING_STATUS.INVALID_CONTENT, "status_message" ) data = None @@ -2126,7 +2064,7 @@ def get_and_merge_rd_attachments( # one time in PACER and provide the correct document_number to use for # every case when merging the attachments into each docket. main_rd_document_number = int(main_rd_local.document_number) - pq_status, msg, rds_affected = process_recap_attachment( + pq_status, msg, rds_affected = async_to_sync(process_recap_attachment)( pq_pk, document_number=main_rd_document_number ) all_attachment_rds += rds_affected @@ -2161,7 +2099,9 @@ def process_recap_email( """ epq = EmailProcessingQueue.objects.get(pk=epq_pk) - mark_pq_status(epq, "", PROCESSING_STATUS.IN_PROGRESS, "status_message") + async_to_sync(mark_pq_status)( + epq, "", PROCESSING_STATUS.IN_PROGRESS, "status_message" + ) data, body = open_and_validate_email_notification(self, epq) if data is None: self.request.chain = None @@ -2220,7 +2160,7 @@ def process_recap_email( dockets_updated = [] for docket_data in dockets: docket_entry = docket_data["docket_entries"][0] - docket = find_docket_object( + docket = async_to_sync(find_docket_object)( epq.court_id, docket_entry["pacer_case_id"], docket_data["docket_number"], @@ -2237,7 +2177,8 @@ def process_recap_email( content_object=docket, upload_type=UPLOAD_TYPE.SES_EMAIL ) pacer_file.filepath.save( - "docket.txt", # We only care about the ext w/S3PrivateUUIDStorageTest + "docket.txt", + # We only care about the ext w/S3PrivateUUIDStorageTest ContentFile(body.encode()), ) # Add docket entries for each docket @@ -2264,7 +2205,7 @@ def process_recap_email( # After properly copying the PDF to the main RECAPDocuments, # mark the PQ object as successful and delete its filepath_local if pq.status != PROCESSING_STATUS.FAILED: - mark_pq_successful(pq) + async_to_sync(mark_pq_successful)(pq) # Get NEF attachments and merge them. all_attachment_rds = [] @@ -2307,7 +2248,9 @@ def process_recap_email( rds_to_extract_add_to_solr = all_attachment_rds + all_main_rds msg = "Successful upload! Nice work." - mark_pq_status(epq, msg, PROCESSING_STATUS.SUCCESSFUL, "status_message") + async_to_sync(mark_pq_status)( + epq, msg, PROCESSING_STATUS.SUCCESSFUL, "status_message" + ) return [rd.pk for rd in rds_to_extract_add_to_solr] diff --git a/cl/recap/tests.py b/cl/recap/tests.py index b393f76461..3406d344f7 100644 --- a/cl/recap/tests.py +++ b/cl/recap/tests.py @@ -6,6 +6,7 @@ from unittest.mock import ANY import time_machine +from asgiref.sync import async_to_sync from dateutil.tz import tzutc from django.conf import settings from django.contrib.auth.hashers import make_password @@ -439,7 +440,7 @@ def test_processing_an_appellate_attachment_page(self, mock_upload): side_effect=lambda x, y: self.att_data, ): # Process the appellate attachment page containing 2 attachments. - process_recap_appellate_attachment(pq.pk) + async_to_sync(process_recap_appellate_attachment)(pq.pk) # After adding attachments, it should only exist 2 RD attachments. self.assertEqual(recap_documents.count(), 2) @@ -465,7 +466,7 @@ def test_processing_an_appellate_attachment_page(self, mock_upload): "cl.recap.tasks.get_data_from_appellate_att_report", side_effect=lambda x, y: self.att_data, ): - process_recap_appellate_attachment(pq_1.pk) + async_to_sync(process_recap_appellate_attachment)(pq_1.pk) # Process the attachment page again, no new attachments should be added self.assertEqual(recap_documents.count(), 2) @@ -509,7 +510,7 @@ def test_reprocess_appellate_docket_after_adding_attachments( "cl.recap.tasks.get_data_from_appellate_att_report", side_effect=lambda x, y: self.att_data, ): - process_recap_appellate_attachment(pq.pk) + async_to_sync(process_recap_appellate_attachment)(pq.pk) # Confirm attachments were added correctly. self.assertEqual(recap_documents.count(), 2) @@ -1112,7 +1113,7 @@ def test_debug_does_not_create_rd(self, mock_extract, mock_get_name): upload_type=UPLOAD_TYPE.PDF, debug=True, ) - process_recap_pdf(pq.pk) + async_to_sync(process_recap_pdf)(pq.pk) self.assertEqual(RECAPDocument.objects.count(), 0) mock_extract.assert_not_called() @@ -1127,7 +1128,7 @@ def test_debug_does_not_create_docket(self, add_atty_mock): upload_type=UPLOAD_TYPE.DOCKET, debug=True, ) - process_recap_docket(pq.pk) + async_to_sync(process_recap_docket)(pq.pk) self.assertEqual(Docket.objects.count(), 0) self.assertEqual(DocketEntry.objects.count(), 0) self.assertEqual(RECAPDocument.objects.count(), 0) @@ -1152,7 +1153,7 @@ def test_debug_does_not_create_recap_documents(self, mock): filepath_local=self.att, debug=True, ) - process_recap_attachment(pq.pk) + async_to_sync(process_recap_attachment)(pq.pk) self.assertEqual(Docket.objects.count(), 1) self.assertEqual(DocketEntry.objects.count(), 1) self.assertEqual(RECAPDocument.objects.count(), 1) @@ -1216,7 +1217,7 @@ def test_recap_document_already_exists(self, mock_extract): cf = ContentFile(self.file_content) self.rd.filepath_local.save(self.filename, cf) - rd = process_recap_pdf(self.pq.pk) + rd = async_to_sync(process_recap_pdf)(self.pq.pk) # Did we avoid creating new objects? self.assertEqual(rd, self.rd) @@ -1244,11 +1245,11 @@ def test_only_the_docket_already_exists(self) -> None: """ self.de.delete() with self.assertRaises(DocketEntry.DoesNotExist): - process_recap_pdf(self.pq.pk) + async_to_sync(process_recap_pdf)(self.pq.pk) self.pq.refresh_from_db() # This doesn't do the celery retries, unfortunately. If we get that # working, the correct status is PROCESSING_STATUS.FAILED. - self.assertEqual(self.pq.status, PROCESSING_STATUS.QUEUED_FOR_RETRY) + self.assertEqual(self.pq.status, PROCESSING_STATUS.FAILED) self.assertIn("Unable to find docket entry", self.pq.error_message) @mock.patch("cl.recap.tasks.extract_recap_pdf_base") @@ -1258,7 +1259,7 @@ def test_docket_and_docket_entry_already_exist(self, mock_extract): This is the good case. We simply create a new item. """ self.rd.delete() - rd = process_recap_pdf(self.pq.pk) + rd = async_to_sync(process_recap_pdf)(self.pq.pk) self.assertTrue(rd.is_available) self.assertTrue(rd.sha1) self.assertTrue(rd.filepath_local) @@ -1281,18 +1282,18 @@ def test_nothing_already_exists(self) -> None: """ self.docket.delete() with self.assertRaises(Docket.DoesNotExist): - process_recap_pdf(self.pq.pk) + async_to_sync(process_recap_pdf)(self.pq.pk) self.pq.refresh_from_db() # This doesn't do the celery retries, unfortunately. If we get that # working, the correct status is PROCESSING_STATUS.FAILED. - self.assertEqual(self.pq.status, PROCESSING_STATUS.QUEUED_FOR_RETRY) + self.assertEqual(self.pq.status, PROCESSING_STATUS.FAILED) self.assertIn("Unable to find docket", self.pq.error_message) def test_ocr_extraction_recap_document(self): """Can we extract a recap document via OCR?""" cf = ContentFile(self.file_content_ocr) self.pq.filepath_local.save(self.filename_ocr, cf) - rd = process_recap_pdf(self.pq.pk) + rd = async_to_sync(process_recap_pdf)(self.pq.pk) recap_document = RECAPDocument.objects.get(pk=rd.pk) self.assertEqual(needs_ocr(recap_document.plain_text), False) self.assertEqual(recap_document.ocr_status, RECAPDocument.OCR_COMPLETE) @@ -1355,7 +1356,7 @@ def test_simple_zip_upload(self, mock_extract): # The original pq should be marked as complete with a good message. pq = ProcessingQueue.objects.get(id=self.pq.id) print(pq.__dict__) - results = process_recap_zip(pq.pk) + results = async_to_sync(process_recap_zip)(pq.pk) pq.refresh_from_db() self.assertEqual( pq.status, @@ -1744,12 +1745,12 @@ def test_all_entries_ingested_without_duplicates(self) -> None: expected_entry_count = 23 pq = self.make_pq() - returned_data = process_recap_docket(pq.pk) + returned_data = async_to_sync(process_recap_docket)(pq.pk) d1 = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d1.docket_entries.count(), expected_entry_count) pq = self.make_pq() - returned_data = process_recap_docket(pq.pk) + returned_data = async_to_sync(process_recap_docket)(pq.pk) d2 = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d1.pk, d2.pk) self.assertEqual(d2.docket_entries.count(), expected_entry_count) @@ -1760,12 +1761,12 @@ def test_multiple_numberless_entries_multiple_times(self) -> None: """ expected_entry_count = 25 pq = self.make_pq("azd_multiple_unnumbered.html") - returned_data = process_recap_docket(pq.pk) + returned_data = async_to_sync(process_recap_docket)(pq.pk) d1 = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d1.docket_entries.count(), expected_entry_count) pq = self.make_pq("azd_multiple_unnumbered.html") - returned_data = process_recap_docket(pq.pk) + returned_data = async_to_sync(process_recap_docket)(pq.pk) d2 = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d1.pk, d2.pk) self.assertEqual(d2.docket_entries.count(), expected_entry_count) @@ -1774,7 +1775,7 @@ def test_appellate_cases_ok(self) -> None: """Do appellate cases get ordered/handled properly?""" expected_entry_count = 16 pq = self.make_pq("ca1.html", upload_type=UPLOAD_TYPE.APPELLATE_DOCKET) - returned_data = process_recap_appellate_docket(pq.pk) + returned_data = async_to_sync(process_recap_appellate_docket)(pq.pk) d1 = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d1.docket_entries.count(), expected_entry_count) @@ -1787,7 +1788,7 @@ def test_rss_feed_ingestion(self) -> None: text = f.read().decode() rss_feed._parse_text(text) docket = rss_feed.data[0] - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_id, docket["pacer_case_id"], docket["docket_number"] ) update_docket_metadata(d, docket) @@ -1990,7 +1991,7 @@ def tearDown(self) -> None: def test_parsing_docket_does_not_exist(self) -> None: """Can we parse an HTML docket we have never seen before?""" - returned_data = process_recap_docket(self.pq.pk) + returned_data = async_to_sync(process_recap_docket)(self.pq.pk) d = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d.source, Docket.RECAP) self.assertTrue(d.case_name) @@ -2001,7 +2002,7 @@ def test_parsing_docket_already_exists(self) -> None: existing_d = Docket.objects.create( source=Docket.DEFAULT, pacer_case_id="asdf", court_id="scotus" ) - returned_data = process_recap_docket(self.pq.pk) + returned_data = async_to_sync(process_recap_docket)(self.pq.pk) d = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d.source, Docket.RECAP_AND_SCRAPER) self.assertTrue(d.case_name) @@ -2014,7 +2015,7 @@ def test_adding_harvard_and_recap_source(self) -> None: Docket.objects.create( source=Docket.HARVARD, pacer_case_id="asdf", court_id="scotus" ) - returned_data = process_recap_docket(self.pq.pk) + returned_data = async_to_sync(process_recap_docket)(self.pq.pk) d = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d.source, Docket.HARVARD_AND_RECAP) @@ -2026,7 +2027,7 @@ def test_docket_and_de_already_exist(self) -> None: existing_de = DocketEntry.objects.create( docket=existing_d, entry_number="1", date_filed=date(2008, 1, 1) ) - returned_data = process_recap_docket(self.pq.pk) + returned_data = async_to_sync(process_recap_docket)(self.pq.pk) d = Docket.objects.get(pk=returned_data["docket_pk"]) de = d.docket_entries.get(pk=existing_de.pk) self.assertNotEqual( @@ -2063,7 +2064,7 @@ def test_orphan_documents_are_added(self, mock) -> None: upload_type=UPLOAD_TYPE.PDF, status=PROCESSING_STATUS.FAILED, ) - process_recap_docket(self.pq.pk) + async_to_sync(process_recap_docket)(self.pq.pk) pq.refresh_from_db() self.assertEqual(pq.status, PROCESSING_STATUS.SUCCESSFUL) @@ -2098,7 +2099,9 @@ def tearDown(self) -> None: def test_parsing_docket_does_not_exist(self) -> None: """Can we parse the claims registry when the docket doesn't exist?""" - returned_data = process_recap_claims_register(self.pq.pk) + returned_data = async_to_sync(process_recap_claims_register)( + self.pq.pk + ) d = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d.source, Docket.RECAP) self.assertTrue(d.case_name) @@ -2116,7 +2119,9 @@ def test_parsing_bad_data(self) -> None: self.pq.filepath_local = f self.pq.save() - returned_data = process_recap_claims_register(self.pq.pk) + returned_data = async_to_sync(process_recap_claims_register)( + self.pq.pk + ) self.assertIsNone(returned_data) self.pq.refresh_from_db() self.assertTrue(self.pq.status, PROCESSING_STATUS.INVALID_CONTENT) @@ -2149,7 +2154,9 @@ def tearDown(self) -> None: def test_parsing_appellate_docket(self) -> None: """Can we parse an HTML docket we have never seen before?""" - returned_data = process_recap_appellate_docket(self.pq.pk) + returned_data = async_to_sync(process_recap_appellate_docket)( + self.pq.pk + ) d = Docket.objects.get(pk=returned_data["docket_pk"]) self.assertEqual(d.source, Docket.RECAP) self.assertTrue(d.case_name) @@ -2194,7 +2201,7 @@ def test_criminal_data_gets_created(self) -> None: """Does the criminal data appear in the DB properly when we process the docket? """ - process_recap_docket(self.pq.pk) + async_to_sync(process_recap_docket)(self.pq.pk) expected_criminal_count_count = 1 self.assertEqual( expected_criminal_count_count, CriminalCount.objects.count() @@ -2243,7 +2250,7 @@ def tearDown(self) -> None: def test_attachments_get_created(self, mock): """Do attachments get created if we have a RECAPDocument to match on?""" - process_recap_attachment(self.pq.pk) + async_to_sync(process_recap_attachment)(self.pq.pk) num_attachments_to_create = 3 self.assertEqual( RECAPDocument.objects.filter( @@ -2258,11 +2265,11 @@ def test_no_rd_match(self, mock): """If there's no RECAPDocument to match on, do we fail gracefully?""" RECAPDocument.objects.all().delete() with self.assertRaises(RECAPDocument.DoesNotExist): - process_recap_attachment(self.pq.pk) + async_to_sync(process_recap_attachment)(self.pq.pk) self.pq.refresh_from_db() # This doesn't do the celery retries, unfortunately. If we get that # working, the correct status is PROCESSING_STATUS.FAILED. - self.assertEqual(self.pq.status, PROCESSING_STATUS.QUEUED_FOR_RETRY) + self.assertEqual(self.pq.status, PROCESSING_STATUS.FAILED) class RecapUploadAuthenticationTest(TestCase): @@ -4360,7 +4367,7 @@ def test_extract_missed_recap_documents(self): self.assertEqual(len(rd_needs_extraction_after), 0) @mock.patch( - "cl.lib.microservice_utils.models.fields.files.FieldFile.open", + "django.db.models.fields.files.FieldFile.open", side_effect=lambda mode: exec("raise FileNotFoundError"), ) def test_clean_up_recap_document_file(self, mock_open): @@ -6283,7 +6290,7 @@ def test_case_id_and_docket_number_core_lookup(self): properly. """ - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, "12345", self.docket_data["docket_number"] ) update_docket_metadata(d, self.docket_data) @@ -6298,7 +6305,7 @@ def test_case_id_and_docket_number_core_lookup(self): def test_case_id_lookup(self): """Confirm if lookup by only pacer_case_id works properly.""" - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, "54321", self.docket_data["docket_number"] ) update_docket_metadata(d, self.docket_data) @@ -6313,7 +6320,7 @@ def test_case_id_lookup(self): def test_docket_number_core_lookup(self): """Confirm if lookup by only docket_number_core works properly.""" - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, self.docket_core_data["docket_entries"][0]["pacer_case_id"], self.docket_core_data["docket_number"], @@ -6330,7 +6337,7 @@ def test_docket_number_core_lookup(self): def test_docket_number_lookup(self): """Confirm if lookup by only docket_number works properly.""" - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, self.docket_no_core_data["docket_entries"][0]["pacer_case_id"], self.docket_no_core_data["docket_number"], @@ -6349,7 +6356,7 @@ def test_avoid_overwrite_docket_by_number_core(self): docket_number_core in the same court, but they are different dockets? """ - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, self.docket_data["docket_entries"][0]["pacer_case_id"], self.docket_data["docket_number"], @@ -6377,7 +6384,7 @@ def test_avoid_overwrite_docket_by_number_core_multiple_results(self): pacer_case_id=None, ) - d = find_docket_object( + d = async_to_sync(find_docket_object)( self.court.pk, self.docket_data["docket_entries"][0]["pacer_case_id"], self.docket_data["docket_number"], @@ -6411,7 +6418,7 @@ def test_lookup_by_normalized_docket_number_case(self): RECAPEmailDocketEntryDataFactory(pacer_case_id="1234568") ], ) - new_d = find_docket_object( + new_d = async_to_sync(find_docket_object)( self.court_appellate.pk, docket_data_lower_number["docket_entries"][0]["pacer_case_id"], docket_data_lower_number["docket_number"], diff --git a/cl/recap/views.py b/cl/recap/views.py index d51dee50a2..1b400318a3 100644 --- a/cl/recap/views.py +++ b/cl/recap/views.py @@ -1,3 +1,4 @@ +from asgiref.sync import async_to_sync, sync_to_async from django.contrib.auth.models import User from rest_framework.exceptions import ValidationError from rest_framework.permissions import IsAuthenticatedOrReadOnly @@ -49,9 +50,10 @@ class PacerProcessingQueueViewSet(LoggingMixin, ModelViewSet): "date_modified", ) - def perform_create(self, serializer): - pq = serializer.save(uploader=self.request.user) - process_recap_upload(pq) + @async_to_sync + async def perform_create(self, serializer): + pq = await sync_to_async(serializer.save)(uploader=self.request.user) + await process_recap_upload(pq) class EmailProcessingQueueViewSet(LoggingMixin, ModelViewSet): diff --git a/cl/recap_rss/tasks.py b/cl/recap_rss/tasks.py index 50f6ba5008..aa7e24e203 100644 --- a/cl/recap_rss/tasks.py +++ b/cl/recap_rss/tasks.py @@ -8,6 +8,7 @@ from typing import Optional import requests +from asgiref.sync import async_to_sync from celery import Task from dateparser import parse from django.core.files.base import ContentFile @@ -289,19 +290,19 @@ def hash_item(item): return item_hash -def is_cached(item_hash): +async def is_cached(item_hash): """Check if a hash is in the RSS Item Cache""" - return RssItemCache.objects.filter(hash=item_hash).exists() + return await RssItemCache.objects.filter(hash=item_hash).aexists() -def cache_hash(item_hash): +async def cache_hash(item_hash): """Add a new hash to the RSS Item Cache :param item_hash: A SHA1 hash you wish to cache. :returns True if successful, False if not. """ try: - RssItemCache.objects.create(hash=item_hash) + await RssItemCache.objects.acreate(hash=item_hash) except IntegrityError: # Happens during race conditions or when you try to cache something # that's already in there. @@ -330,7 +331,7 @@ def merge_rss_feed_contents(self, feed_data, court_pk, metadata_only=False): d_pks_to_alert = [] for docket in feed_data: item_hash = hash_item(docket) - if is_cached(item_hash): + if async_to_sync(is_cached)(item_hash): continue with transaction.atomic(): @@ -339,7 +340,7 @@ def merge_rss_feed_contents(self, feed_data, court_pk, metadata_only=False): # The item is already in the cache, ergo it's getting processed # in another thread/process and we had a race condition. continue - d = find_docket_object( + d = async_to_sync(find_docket_object)( court_pk, docket["pacer_case_id"], docket["docket_number"] ) diff --git a/cl/scrapers/tasks.py b/cl/scrapers/tasks.py index f5f84552a3..a4cdd62b43 100644 --- a/cl/scrapers/tasks.py +++ b/cl/scrapers/tasks.py @@ -4,9 +4,11 @@ from typing import List, Optional, Tuple, Union import requests +from asgiref.sync import async_to_sync, sync_to_async from django.apps import apps from django.conf import settings from django.core.files.base import ContentFile +from httpx import Response from juriscraper.lib.exceptions import PacerLoginException from juriscraper.pacer import CaseQuery, PacerSession from redis import ConnectionError as RedisConnectionError @@ -121,11 +123,11 @@ def extract_doc_content( opinion = Opinion.objects.get(pk=pk) # Try to extract opinion content without using OCR. - response = microservice( + response = async_to_sync(microservice)( service="document-extract", item=opinion, ) - if not response.ok: + if not response.is_success: logging.warning( f"Error from document-extract microservice: {response.status_code}" ) @@ -140,12 +142,12 @@ def extract_doc_content( and needs_ocr(content) and ".pdf" in str(opinion.local_path) ): - response = microservice( + response = async_to_sync(microservice)( service="document-extract-ocr", item=opinion, params={"ocr_available": ocr_available}, ) - if response.ok: + if response.is_success: content = response.json()["content"] extracted_by_ocr = True @@ -232,10 +234,12 @@ def extract_recap_pdf( :return: A list of processed RECAPDocument """ - return extract_recap_pdf_base(pks, ocr_available, check_if_needed) + return async_to_sync(extract_recap_pdf_base)( + pks, ocr_available, check_if_needed + ) -def extract_recap_pdf_base( +async def extract_recap_pdf_base( pks: Union[int, List[int]], ocr_available: bool = True, check_if_needed: bool = True, @@ -255,30 +259,30 @@ def extract_recap_pdf_base( processed = [] for pk in pks: - rd = RECAPDocument.objects.get(pk=pk) + rd = await RECAPDocument.objects.aget(pk=pk) if check_if_needed and not rd.needs_extraction: # Early abort if the item doesn't need extraction and the user # hasn't disabled early abortion. processed.append(pk) continue - response = microservice( + response = await microservice( service="document-extract", item=rd, ) - if not response.ok: + if not response.is_success: continue content = response.json()["content"] extracted_by_ocr = response.json()["extracted_by_ocr"] ocr_needed = needs_ocr(content) if ocr_available and ocr_needed: - response = microservice( + response = await microservice( service="document-extract-ocr", item=rd, params={"ocr_available": ocr_available}, ) - if response.ok: + if response.is_success: content = response.json()["content"] extracted_by_ocr = True @@ -296,7 +300,7 @@ def extract_recap_pdf_base( rd.plain_text, _ = anonymize(content) # Do not do indexing here. Creates race condition in celery. - rd.save(index=False, do_extraction=False) + await rd.asave(index=False, do_extraction=False) processed.append(pk) return processed @@ -337,7 +341,7 @@ def process_audio_file(self, pk) -> None: "case_name_short": audio_obj.case_name_short, "download_url": audio_obj.download_url, } - audio_response = microservice( + audio_response: Response = async_to_sync(microservice)( service="convert-audio", item=audio_obj, params=audio_data, @@ -348,7 +352,7 @@ def process_audio_file(self, pk) -> None: audio_obj.file_with_date = audio_obj.docket.date_argued audio_obj.local_path_mp3.save(file_name, cf, save=False) audio_obj.duration = float( - microservice( + async_to_sync(microservice)( service="audio-duration", file=audio_response.content, file_type="mp3", diff --git a/cl/scrapers/tests.py b/cl/scrapers/tests.py index 143197705d..4f7dfafe54 100644 --- a/cl/scrapers/tests.py +++ b/cl/scrapers/tests.py @@ -3,6 +3,7 @@ from http import HTTPStatus from pathlib import Path +from asgiref.sync import async_to_sync from django.conf import settings from django.core.files.base import ContentFile from django.utils.timezone import now @@ -33,7 +34,7 @@ def setUpTestData(cls) -> None: cls.court = CourtFactory(id="test", jurisdiction="F") def test_extension(self): - r = microservice( + r = async_to_sync(microservice)( service="buffer-extension", params={"mime": True}, ) @@ -512,7 +513,7 @@ def test_audio_conversion(self) -> None: "case_name_short": audio_obj.case_name_short, "download_url": audio_obj.download_url, } - audio_response = microservice( + audio_response = async_to_sync(microservice)( service="convert-audio", item=audio_obj, params=audio_data, diff --git a/cl/scrapers/utils.py b/cl/scrapers/utils.py index 01ed5e9033..b2b3a0af08 100644 --- a/cl/scrapers/utils.py +++ b/cl/scrapers/utils.py @@ -6,6 +6,7 @@ from urllib.parse import urljoin import requests +from asgiref.sync import async_to_sync from django.conf import settings from django.db.models import QuerySet from juriscraper.AbstractSite import logger @@ -38,7 +39,7 @@ def test_for_meta_redirections(r: Response) -> Tuple[bool, Optional[str]]: :param r: A response object :return: A boolean and value """ - extension = microservice( + extension = async_to_sync(microservice)( service="buffer-extension", file=r.content, params={"mime": True}, @@ -88,7 +89,7 @@ def follow_redirections(r: Response, s: Session) -> Response: ) def get_extension(content: bytes) -> str: """A handful of workarounds for getting extensions we can trust.""" - return microservice( + return async_to_sync(microservice)( service="buffer-extension", file=content, ).text @@ -231,7 +232,7 @@ def update_or_create_docket( :param ia_needs_upload: If the docket needs upload to IA, default None. :return: The docket docket. """ - docket = find_docket_object(court_id, None, docket_number) + docket = async_to_sync(find_docket_object)(court_id, None, docket_number) if docket.pk: docket.case_name = case_name docket.case_name_short = case_name_short diff --git a/cl/search/models.py b/cl/search/models.py index d04587edef..cdd3290999 100644 --- a/cl/search/models.py +++ b/cl/search/models.py @@ -4,6 +4,7 @@ import pghistory import pytz +from asgiref.sync import sync_to_async from celery.canvas import chain from django.contrib.contenttypes.fields import GenericRelation from django.core.exceptions import ValidationError @@ -1453,6 +1454,22 @@ def save( if len(tasks) > 0: chain(*tasks)() + async def asave( + self, + update_fields=None, + do_extraction=False, + index=False, + *args, + **kwargs, + ): + return await sync_to_async(self.save)( + update_fields=update_fields, + do_extraction=do_extraction, + index=index, + *args, + **kwargs, + ) + def delete(self, *args, **kwargs): """ Note that this doesn't get called when an entire queryset diff --git a/cl/search/tests.py b/cl/search/tests.py index 4e09ce1d83..630bb6e5da 100644 --- a/cl/search/tests.py +++ b/cl/search/tests.py @@ -15,7 +15,7 @@ from django.core.management import call_command from django.db import IntegrityError, transaction from django.http import HttpRequest -from django.test import RequestFactory, override_settings +from django.test import AsyncRequestFactory, override_settings from django.urls import reverse from factory import RelatedFactory from lxml import etree, html @@ -1165,7 +1165,7 @@ def setUp(self) -> None: "--noinput", ] call_command("cl_update_index", *args) - self.factory = RequestFactory() + self.factory = AsyncRequestFactory() def test_grouped_queries(self) -> None: """When we have a cluster with multiple opinions, do results get diff --git a/cl/simple_pages/urls.py b/cl/simple_pages/urls.py index a473345c19..90ecf3491e 100644 --- a/cl/simple_pages/urls.py +++ b/cl/simple_pages/urls.py @@ -30,25 +30,25 @@ path("faq/", faq, name="faq"), path("feeds/", feeds, name="feeds_info"), path("podcasts/", podcasts, name="podcasts"), - path("contribute/", contribute, name="contribute"), + path("contribute/", contribute, name="contribute"), # type: ignore[arg-type] path("contact/", contact, name="contact"), - path("contact/thanks/", contact_thanks, name="contact_thanks"), + path("contact/thanks/", contact_thanks, name="contact_thanks"), # type: ignore[arg-type] # Help pages - path("help/", help_home, name="help_home"), + path("help/", help_home, name="help_home"), # type: ignore[arg-type] path("help/coverage/", coverage_graph, name="coverage"), path( "help/coverage/financial-disclosures/", coverage_fds, name="coverage_fds", ), - path("help/markdown/", markdown_help, name="markdown_help"), + path("help/markdown/", markdown_help, name="markdown_help"), # type: ignore[arg-type] path("help/alerts/", alert_help, name="alert_help"), - path("help/donations/", donation_help, name="donation_help"), - path("help/delete-account/", delete_help, name="delete_help"), - path("help/tags-notes/", tag_notes_help, name="tag_notes_help"), - path("help/search-operators/", advanced_search, name="advanced_search"), - path("help/recap/email/", recap_email_help, name="recap_email_help"), - path("help/broken-email/", broken_email_help, name="broken_email_help"), + path("help/donations/", donation_help, name="donation_help"), # type: ignore[arg-type] + path("help/delete-account/", delete_help, name="delete_help"), # type: ignore[arg-type] + path("help/tags-notes/", tag_notes_help, name="tag_notes_help"), # type: ignore[arg-type] + path("help/search-operators/", advanced_search, name="advanced_search"), # type: ignore[arg-type] + path("help/recap/email/", recap_email_help, name="recap_email_help"), # type: ignore[arg-type] + path("help/broken-email/", broken_email_help, name="broken_email_help"), # type: ignore[arg-type] # Added 2018-10-23 path( "search/advanced-techniques/", @@ -64,10 +64,10 @@ "coverage/financial-disclosures/", RedirectView.as_view(pattern_name="coverage_fds", permanent=True), ), - path("terms/v//", old_terms, name="old_terms"), - path("terms/", latest_terms, name="terms"), + path("terms/v//", old_terms, name="old_terms"), # type: ignore[arg-type] + path("terms/", latest_terms, name="terms"), # type: ignore[arg-type] # Robots path("robots.txt", robots, name="robots"), # SEO-related stuff - path("mywot8f5568174e171ff0acff.html", validate_for_wot), + path("mywot8f5568174e171ff0acff.html", validate_for_wot), # type: ignore[arg-type] ] diff --git a/cl/simple_pages/views.py b/cl/simple_pages/views.py index e3a06c3980..a8f871450e 100644 --- a/cl/simple_pages/views.py +++ b/cl/simple_pages/views.py @@ -45,7 +45,7 @@ logger = logging.getLogger(__name__) -def about(request: HttpRequest) -> HttpResponse: +async def about(request: HttpRequest) -> HttpResponse: """Loads the about page""" return TemplateResponse(request, "about.html", {"private": False}) @@ -80,7 +80,7 @@ def faq(request: HttpRequest) -> HttpResponse: ) -def help_home(request: HttpRequest) -> HttpResponse: +async def help_home(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "help/index.html", {"private": False}) @@ -119,35 +119,35 @@ def alert_help(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "help/alert_help.html", context) -def donation_help(request: HttpRequest) -> HttpResponse: +async def donation_help(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/donation_help.html", {"private": False} ) -def delete_help(request: HttpRequest) -> HttpResponse: +async def delete_help(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/delete_account_help.html", {"private": False} ) -def markdown_help(request: HttpRequest) -> HttpResponse: +async def markdown_help(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/markdown_help.html", {"private": False} ) -def tag_notes_help(request: HttpRequest) -> HttpResponse: +async def tag_notes_help(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "help/tags_help.html", {"private": False}) -def recap_email_help(request: HttpRequest) -> HttpResponse: +async def recap_email_help(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/recap_email_help.html", {"private": False} ) -def broken_email_help(request: HttpRequest) -> HttpResponse: +async def broken_email_help(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/broken_email_help.html", @@ -306,7 +306,7 @@ def podcasts(request: HttpRequest) -> HttpResponse: ) -def contribute(request: HttpRequest) -> HttpResponse: +async def contribute(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "contribute.html", {"private": False}) @@ -373,17 +373,17 @@ def contact( return TemplateResponse(request, template_path, template_data) -def contact_thanks(request: HttpRequest) -> HttpResponse: +async def contact_thanks(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "contact_thanks.html", {"private": True}) -def advanced_search(request: HttpRequest) -> HttpResponse: +async def advanced_search(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "help/advanced_search.html", {"private": False} ) -def old_terms(request: HttpRequest, v: str) -> HttpResponse: +async def old_terms(request: HttpRequest, v: str) -> HttpResponse: return TemplateResponse( request, f"terms/{v}.html", @@ -395,7 +395,7 @@ def old_terms(request: HttpRequest, v: str) -> HttpResponse: ) -def latest_terms(request: HttpRequest) -> HttpResponse: +async def latest_terms(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "terms/latest.html", @@ -415,11 +415,13 @@ def robots(request: HttpRequest) -> HttpResponse: return response -def validate_for_wot(request: HttpRequest) -> HttpResponse: +async def validate_for_wot(request: HttpRequest) -> HttpResponse: return HttpResponse("bcb982d1e23b7091d5cf4e46826c8fc0") -def ratelimited(request: HttpRequest, exception: Exception) -> HttpResponse: +async def ratelimited( + request: HttpRequest, exception: Exception +) -> HttpResponse: return TemplateResponse( request, "429.html", diff --git a/cl/users/views.py b/cl/users/views.py index e884d3d832..0e32d08ec6 100644 --- a/cl/users/views.py +++ b/cl/users/views.py @@ -418,7 +418,7 @@ def delete_account(request: AuthenticatedHttpRequest) -> HttpResponse: ) -def delete_profile_done(request: HttpRequest) -> HttpResponse: +async def delete_profile_done(request: HttpRequest) -> HttpResponse: return TemplateResponse(request, "profile/deleted.html", {"private": True}) @@ -442,7 +442,7 @@ def take_out(request: AuthenticatedHttpRequest) -> HttpResponse: ) -def take_out_done(request: HttpRequest) -> HttpResponse: +async def take_out_done(request: HttpRequest) -> HttpResponse: return TemplateResponse( request, "profile/take_out_done.html", diff --git a/poetry.lock b/poetry.lock index 7f91780e6a..663839364c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1610,6 +1610,32 @@ files = [ {file = "h11-0.13.0.tar.gz", hash = "sha256:70813c1135087a248a4d38cc0e1a0181ffab2188141a93eaf567940c3957ff06"}, ] +[[package]] +name = "h2" +version = "4.1.0" +description = "HTTP/2 State-Machine based protocol implementation" +optional = false +python-versions = ">=3.6.1" +files = [ + {file = "h2-4.1.0-py3-none-any.whl", hash = "sha256:03a46bcf682256c95b5fd9e9a99c1323584c3eec6440d379b9903d709476bc6d"}, + {file = "h2-4.1.0.tar.gz", hash = "sha256:a83aca08fbe7aacb79fec788c9c0bac936343560ed9ec18b82a13a12c28d2abb"}, +] + +[package.dependencies] +hpack = ">=4.0,<5" +hyperframe = ">=6.0,<7" + +[[package]] +name = "hpack" +version = "4.0.0" +description = "Pure-Python HPACK header compression" +optional = false +python-versions = ">=3.6.1" +files = [ + {file = "hpack-4.0.0-py3-none-any.whl", hash = "sha256:84a076fad3dc9a9f8063ccb8041ef100867b1878b25ef0ee63847a5d53818a6c"}, + {file = "hpack-4.0.0.tar.gz", hash = "sha256:fc41de0c63e687ebffde81187a948221294896f6bdc0ae2312708df339430095"}, +] + [[package]] name = "html5lib" version = "1.1" @@ -1631,6 +1657,27 @@ chardet = ["chardet (>=2.2)"] genshi = ["genshi"] lxml = ["lxml"] +[[package]] +name = "httpcore" +version = "0.17.2" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.7" +files = [ + {file = "httpcore-0.17.2-py3-none-any.whl", hash = "sha256:5581b9c12379c4288fe70f43c710d16060c10080617001e6b22a3b6dbcbefd36"}, + {file = "httpcore-0.17.2.tar.gz", hash = "sha256:125f8375ab60036db632f34f4b627a9ad085048eef7cb7d2616fea0f739f98af"}, +] + +[package.dependencies] +anyio = ">=3.0,<5.0" +certifi = "*" +h11 = ">=0.13,<0.15" +sniffio = "==1.*" + +[package.extras] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] + [[package]] name = "httplib2" version = "0.22.0" @@ -1698,6 +1745,41 @@ files = [ [package.extras] test = ["Cython (>=0.29.24,<0.30.0)"] +[[package]] +name = "httpx" +version = "0.24.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.7" +files = [ + {file = "httpx-0.24.1-py3-none-any.whl", hash = "sha256:06781eb9ac53cde990577af654bd990a4949de37a28bdb4a230d434f3a30b9bd"}, + {file = "httpx-0.24.1.tar.gz", hash = "sha256:5853a43053df830c20f8110c5e69fe44d035d850b2dfe795e196f00fdb774bdd"}, +] + +[package.dependencies] +certifi = "*" +h2 = {version = ">=3,<5", optional = true, markers = "extra == \"http2\""} +httpcore = ">=0.15.0,<0.18.0" +idna = "*" +sniffio = "*" + +[package.extras] +brotli = ["brotli", "brotlicffi"] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] + +[[package]] +name = "hyperframe" +version = "6.0.1" +description = "HTTP/2 framing layer for Python" +optional = false +python-versions = ">=3.6.1" +files = [ + {file = "hyperframe-6.0.1-py3-none-any.whl", hash = "sha256:0ec6bafd80d8ad2195c4f03aacba3a8265e57bc4cff261e802bf39970ed02a15"}, + {file = "hyperframe-6.0.1.tar.gz", hash = "sha256:ae510046231dc8e9ecb1a6586f63d2347bf4c8905914aa84ba585ae85f28a914"}, +] + [[package]] name = "hyperlink" version = "21.0.0" @@ -4576,4 +4658,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.11, <3.12" -content-hash = "674af32861e1e5bf9c31401f02a3af0b698be8b60b9492cd89ab5464218efd3e" +content-hash = "c772ce322a5df2a3542a385836fe500b979b84cfc8b5cd998f54a17ce6cb2889" diff --git a/pyproject.toml b/pyproject.toml index 91020cf1e0..fa3712d8e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,6 +108,7 @@ types-dateparser = "^1.1.4.6" juriscraper = "^2.5.49" uvicorn = {extras = ["standard"], version = "^0.22.0"} daphne = "^4.0.0" +httpx = {extras = ["http2"], version = "^0.24.1"} [tool.poetry.group.dev.dependencies]