Skip to content

Commit

Permalink
Convert some views to native async.
Browse files Browse the repository at this point in the history
  • Loading branch information
ttys0dev committed Jun 9, 2023
1 parent 3a00d08 commit 0f99c3a
Show file tree
Hide file tree
Showing 24 changed files with 651 additions and 570 deletions.
10 changes: 5 additions & 5 deletions cl/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def api_index(request: HttpRequest) -> HttpResponse:
)


def replication_docs(request: HttpRequest) -> HttpResponse:
async def replication_docs(request: HttpRequest) -> HttpResponse:
return render(request, "replication.html", {"private": False})


Expand Down Expand Up @@ -159,7 +159,7 @@ def coverage_data(request, version, court):
)


def get_result_count(request, version, day_count):
async def get_result_count(request, version, day_count):
"""Get the count of results for the past `day_count` number of days
GET parameters will be a complete search string
Expand Down Expand Up @@ -199,7 +199,7 @@ def get_result_count(request, version, day_count):
return JsonResponse({"count": response.result.numFound}, safe=True)


def deprecated_api(request, v):
async def deprecated_api(request, v):
return JsonResponse(
{
"meta": {
Expand All @@ -213,12 +213,12 @@ def deprecated_api(request, v):
)


def webhooks_getting_started(request):
async def webhooks_getting_started(request):
context = {"private": False}
return render(request, "webhooks-getting-started.html", context)


def webhooks_docs(request, version=None):
async def webhooks_docs(request, version=None):
"""Show the correct version of the webhooks docs"""

context = {"private": False}
Expand Down
35 changes: 19 additions & 16 deletions cl/corpus_importer/management/commands/troller_bk.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, DefaultDict, Mapping, TypedDict
from urllib.parse import unquote

from asgiref.sync import async_to_sync, sync_to_async
from dateutil.parser import ParserError
from django.db import DataError, IntegrityError, transaction
from django.db.models import Q
Expand Down Expand Up @@ -45,7 +46,7 @@
FILES_BUFFER_THRESHOLD = 3


def check_for_early_termination(
async def check_for_early_termination(
court_id: str, docket: dict[str, Any]
) -> str | None:
"""Check for early termination, skip the rest of the file in case a cached
Expand All @@ -58,13 +59,13 @@ def check_for_early_termination(
omitted, "continue" if only the current item should be omitted or None.
"""
item_hash = hash_item(docket)
if is_cached(item_hash):
if await is_cached(item_hash):
logger.info(
f"Hit a cached item, finishing adding bulk entries for {court_id} feed. "
)
return "break"

cache_hash(item_hash)
await cache_hash(item_hash)
if (
not docket["pacer_case_id"]
and not docket["docket_number"]
Expand Down Expand Up @@ -228,7 +229,7 @@ def get_rds_to_add(
return rds_to_create_bulk


def merge_rss_data(
async def merge_rss_data(
feed_data: list[dict[str, Any]],
court_id: str,
build_date: datetime | None,
Expand All @@ -242,7 +243,7 @@ def merge_rss_data(
"""

court_id = map_pacer_to_cl_id(court_id)
court = Court.objects.get(pk=court_id)
court = await Court.objects.aget(pk=court_id)
dockets_created = 0
all_rds_created: list[int] = []
district_court_ids = (
Expand All @@ -255,7 +256,7 @@ def merge_rss_data(
build_date
and build_date
> make_aware(datetime(year=2018, month=4, day=20), timezone.utc)
and court_id in district_court_ids
and await district_court_ids.filter(id=court_id).aexists()
and court_id not in courts_exceptions_no_rss
):
# Avoid parsing/adding feeds after we start scraping RSS Feeds for
Expand All @@ -269,13 +270,13 @@ def merge_rss_data(
str, list[dict[str, Any]]
] = defaultdict(list)
for docket in feed_data:
skip_or_break = check_for_early_termination(court_id, docket)
skip_or_break = await check_for_early_termination(court_id, docket)
if skip_or_break == "continue":
continue
elif skip_or_break == "break":
break

d = find_docket_object(
d = await find_docket_object(
court_id,
docket["pacer_case_id"],
docket["docket_number"],
Expand All @@ -285,7 +286,9 @@ def merge_rss_data(
if (
document_number
and d.pk
and d.docket_entries.filter(entry_number=document_number).exists()
and await d.docket_entries.filter(
entry_number=document_number
).aexists()
):
# It's an existing docket entry; let's not add it.
continue
Expand All @@ -301,11 +304,11 @@ def merge_rss_data(
)
if (
d.pk
and d.docket_entries.filter(
and await d.docket_entries.filter(
query,
date_filed=docket_entry["date_filed"],
entry_number=docket_entry["document_number"],
).exists()
).aexists()
):
# It's an existing docket entry; let's not add it.
continue
Expand All @@ -322,7 +325,7 @@ def merge_rss_data(
# court and doesn't have a pacer_case_id
continue

add_new_docket_from_rss(
await sync_to_async(add_new_docket_from_rss)(
court_id,
d,
docket,
Expand All @@ -338,15 +341,15 @@ def merge_rss_data(
# docket entry to add in bulk.
des_to_add_existing_docket.append((d.pk, docket_entry))
try:
d.save(update_fields=["source"])
add_bankruptcy_data_to_docket(d, docket)
await d.asave(update_fields=["source"])
await sync_to_async(add_bankruptcy_data_to_docket)(d, docket)
except (DataError, IntegrityError) as exc:
# Trouble. Log and move on
logger.warn(
f"Got DataError or IntegrityError while saving docket."
)

rds_created_pks, dockets_created = do_bulk_additions(
rds_created_pks, dockets_created = await sync_to_async(do_bulk_additions)(
court_id,
unique_dockets,
dockets_to_create,
Expand Down Expand Up @@ -601,7 +604,7 @@ def iterate_and_import_files(
f"Skipping: {item_path=} with {court_id=} due to incorrect date format. \n"
)
continue
rds_for_solr, dockets_created = merge_rss_data(
rds_for_solr, dockets_created = async_to_sync(merge_rss_data)(
feed_data, court_id, build_date
)

Expand Down
25 changes: 15 additions & 10 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import internetarchive as ia
import requests
from asgiref.sync import async_to_sync
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
Expand Down Expand Up @@ -664,7 +665,9 @@ def get_and_process_free_pdf(

# Get the data temporarily. OCR is done for all nightly free
# docs in a separate batch, but may as well do the easy ones.
extract_recap_pdf_base(rd.pk, ocr_available=False, check_if_needed=False)
async_to_sync(extract_recap_pdf_base)(
rd.pk, ocr_available=False, check_if_needed=False
)
return {"result": result, "rd_pk": rd.pk}


Expand Down Expand Up @@ -1056,7 +1059,7 @@ def do_case_query_by_pacer_case_id(

# Merge the contents into CL.
if d is None:
d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id, pacer_case_id, docket_data["docket_number"]
)

Expand Down Expand Up @@ -1184,7 +1187,7 @@ def make_docket_by_iquery(
)
return None

d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id,
str(pacer_case_id),
report.data["docket_number"],
Expand Down Expand Up @@ -1287,7 +1290,7 @@ def get_docket_by_pacer_case_id(
return None

if d is None:
d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id, pacer_case_id, docket_data["docket_number"]
)

Expand Down Expand Up @@ -1365,7 +1368,9 @@ def get_appellate_docket_by_docket_number(
d = None

if d is None:
d = find_docket_object(court_id, docket_number, docket_number)
d = async_to_sync(find_docket_object)(
court_id, docket_number, docket_number
)

rds_created, content_updated = merge_pacer_docket_into_cl_docket(
d,
Expand Down Expand Up @@ -1676,12 +1681,12 @@ def get_document_number_for_appellate(
pdf_bytes = local_path.read()
if pdf_bytes:
# For other jurisdictions try first to get it from the PDF document.
dn_response = microservice(
dn_response = async_to_sync(microservice)(
service="document-number",
file_type="pdf",
file=pdf_bytes,
)
if dn_response.ok and dn_response.text:
if dn_response.is_success and dn_response.text:
document_number = dn_response.text

if not document_number and pacer_doc_id:
Expand Down Expand Up @@ -1783,11 +1788,11 @@ def update_rd_metadata(
# request.content is sometimes a str, sometimes unicode, so
# force it all to be bytes, pleasing hashlib.
rd.sha1 = sha1(pdf_bytes)
response = microservice(
response = async_to_sync(microservice)(
service="page-count",
item=rd,
)
if response.ok:
if response.is_success:
rd.page_count = response.text

# Save and extract, skipping OCR.
Expand Down Expand Up @@ -1978,7 +1983,7 @@ def get_pacer_doc_by_rd_and_description(
return

# Skip OCR for now. It'll happen in a second step.
extract_recap_pdf_base(rd.pk, ocr_available=False)
async_to_sync(extract_recap_pdf_base)(rd.pk, ocr_available=False)
add_items_to_solr([rd.pk], "search.RECAPDocument")


Expand Down
Loading

0 comments on commit 0f99c3a

Please sign in to comment.