Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert some views to native async. #2720

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cl/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def api_index(request: HttpRequest) -> HttpResponse:
)


def replication_docs(request: HttpRequest) -> HttpResponse:
async def replication_docs(request: HttpRequest) -> HttpResponse:
return render(request, "replication.html", {"private": False})


Expand Down Expand Up @@ -159,7 +159,7 @@ def coverage_data(request, version, court):
)


def get_result_count(request, version, day_count):
async def get_result_count(request, version, day_count):
"""Get the count of results for the past `day_count` number of days

GET parameters will be a complete search string
Expand Down Expand Up @@ -199,7 +199,7 @@ def get_result_count(request, version, day_count):
return JsonResponse({"count": response.result.numFound}, safe=True)


def deprecated_api(request, v):
async def deprecated_api(request, v):
return JsonResponse(
{
"meta": {
Expand All @@ -213,12 +213,12 @@ def deprecated_api(request, v):
)


def webhooks_getting_started(request):
async def webhooks_getting_started(request):
context = {"private": False}
return render(request, "webhooks-getting-started.html", context)


def webhooks_docs(request, version=None):
async def webhooks_docs(request, version=None):
"""Show the correct version of the webhooks docs"""

context = {"private": False}
Expand Down
35 changes: 19 additions & 16 deletions cl/corpus_importer/management/commands/troller_bk.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, DefaultDict, Mapping, TypedDict
from urllib.parse import unquote

from asgiref.sync import async_to_sync, sync_to_async
from dateutil.parser import ParserError
from django.db import DataError, IntegrityError, transaction
from django.db.models import Q
Expand Down Expand Up @@ -45,7 +46,7 @@
FILES_BUFFER_THRESHOLD = 3


def check_for_early_termination(
async def check_for_early_termination(
court_id: str, docket: dict[str, Any]
) -> str | None:
"""Check for early termination, skip the rest of the file in case a cached
Expand All @@ -58,13 +59,13 @@ def check_for_early_termination(
omitted, "continue" if only the current item should be omitted or None.
"""
item_hash = hash_item(docket)
if is_cached(item_hash):
if await is_cached(item_hash):
logger.info(
f"Hit a cached item, finishing adding bulk entries for {court_id} feed. "
)
return "break"

cache_hash(item_hash)
await cache_hash(item_hash)
if (
not docket["pacer_case_id"]
and not docket["docket_number"]
Expand Down Expand Up @@ -228,7 +229,7 @@ def get_rds_to_add(
return rds_to_create_bulk


def merge_rss_data(
async def merge_rss_data(
feed_data: list[dict[str, Any]],
court_id: str,
build_date: datetime | None,
Expand All @@ -242,7 +243,7 @@ def merge_rss_data(
"""

court_id = map_pacer_to_cl_id(court_id)
court = Court.objects.get(pk=court_id)
court = await Court.objects.aget(pk=court_id)
dockets_created = 0
all_rds_created: list[int] = []
district_court_ids = (
Expand All @@ -255,7 +256,7 @@ def merge_rss_data(
build_date
and build_date
> make_aware(datetime(year=2018, month=4, day=20), timezone.utc)
and court_id in district_court_ids
and await district_court_ids.filter(id=court_id).aexists()
and court_id not in courts_exceptions_no_rss
):
# Avoid parsing/adding feeds after we start scraping RSS Feeds for
Expand All @@ -269,13 +270,13 @@ def merge_rss_data(
str, list[dict[str, Any]]
] = defaultdict(list)
for docket in feed_data:
skip_or_break = check_for_early_termination(court_id, docket)
skip_or_break = await check_for_early_termination(court_id, docket)
if skip_or_break == "continue":
continue
elif skip_or_break == "break":
break

d = find_docket_object(
d = await find_docket_object(
court_id,
docket["pacer_case_id"],
docket["docket_number"],
Expand All @@ -285,7 +286,9 @@ def merge_rss_data(
if (
document_number
and d.pk
and d.docket_entries.filter(entry_number=document_number).exists()
and await d.docket_entries.filter(
entry_number=document_number
).aexists()
):
# It's an existing docket entry; let's not add it.
continue
Expand All @@ -301,11 +304,11 @@ def merge_rss_data(
)
if (
d.pk
and d.docket_entries.filter(
and await d.docket_entries.filter(
query,
date_filed=docket_entry["date_filed"],
entry_number=docket_entry["document_number"],
).exists()
).aexists()
):
# It's an existing docket entry; let's not add it.
continue
Expand All @@ -322,7 +325,7 @@ def merge_rss_data(
# court and doesn't have a pacer_case_id
continue

add_new_docket_from_rss(
await sync_to_async(add_new_docket_from_rss)(
court_id,
d,
docket,
Expand All @@ -338,15 +341,15 @@ def merge_rss_data(
# docket entry to add in bulk.
des_to_add_existing_docket.append((d.pk, docket_entry))
try:
d.save(update_fields=["source"])
add_bankruptcy_data_to_docket(d, docket)
await d.asave(update_fields=["source"])
await sync_to_async(add_bankruptcy_data_to_docket)(d, docket)
except (DataError, IntegrityError) as exc:
# Trouble. Log and move on
logger.warn(
f"Got DataError or IntegrityError while saving docket."
)

rds_created_pks, dockets_created = do_bulk_additions(
rds_created_pks, dockets_created = await sync_to_async(do_bulk_additions)(
court_id,
unique_dockets,
dockets_to_create,
Expand Down Expand Up @@ -601,7 +604,7 @@ def iterate_and_import_files(
f"Skipping: {item_path=} with {court_id=} due to incorrect date format. \n"
)
continue
rds_for_solr, dockets_created = merge_rss_data(
rds_for_solr, dockets_created = async_to_sync(merge_rss_data)(
feed_data, court_id, build_date
)

Expand Down
25 changes: 15 additions & 10 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import internetarchive as ia
import requests
from asgiref.sync import async_to_sync
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
Expand Down Expand Up @@ -664,7 +665,9 @@ def get_and_process_free_pdf(

# Get the data temporarily. OCR is done for all nightly free
# docs in a separate batch, but may as well do the easy ones.
extract_recap_pdf_base(rd.pk, ocr_available=False, check_if_needed=False)
async_to_sync(extract_recap_pdf_base)(
rd.pk, ocr_available=False, check_if_needed=False
)
return {"result": result, "rd_pk": rd.pk}


Expand Down Expand Up @@ -1056,7 +1059,7 @@ def do_case_query_by_pacer_case_id(

# Merge the contents into CL.
if d is None:
d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id, pacer_case_id, docket_data["docket_number"]
)

Expand Down Expand Up @@ -1184,7 +1187,7 @@ def make_docket_by_iquery(
)
return None

d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id,
str(pacer_case_id),
report.data["docket_number"],
Expand Down Expand Up @@ -1287,7 +1290,7 @@ def get_docket_by_pacer_case_id(
return None

if d is None:
d = find_docket_object(
d = async_to_sync(find_docket_object)(
court_id, pacer_case_id, docket_data["docket_number"]
)

Expand Down Expand Up @@ -1365,7 +1368,9 @@ def get_appellate_docket_by_docket_number(
d = None

if d is None:
d = find_docket_object(court_id, docket_number, docket_number)
d = async_to_sync(find_docket_object)(
court_id, docket_number, docket_number
)

rds_created, content_updated = merge_pacer_docket_into_cl_docket(
d,
Expand Down Expand Up @@ -1676,12 +1681,12 @@ def get_document_number_for_appellate(
pdf_bytes = local_path.read()
if pdf_bytes:
# For other jurisdictions try first to get it from the PDF document.
dn_response = microservice(
dn_response = async_to_sync(microservice)(
service="document-number",
file_type="pdf",
file=pdf_bytes,
)
if dn_response.ok and dn_response.text:
if dn_response.is_success and dn_response.text:
document_number = dn_response.text

if not document_number and pacer_doc_id:
Expand Down Expand Up @@ -1783,11 +1788,11 @@ def update_rd_metadata(
# request.content is sometimes a str, sometimes unicode, so
# force it all to be bytes, pleasing hashlib.
rd.sha1 = sha1(pdf_bytes)
response = microservice(
response = async_to_sync(microservice)(
service="page-count",
item=rd,
)
if response.ok:
if response.is_success:
rd.page_count = response.text

# Save and extract, skipping OCR.
Expand Down Expand Up @@ -1978,7 +1983,7 @@ def get_pacer_doc_by_rd_and_description(
return

# Skip OCR for now. It'll happen in a second step.
extract_recap_pdf_base(rd.pk, ocr_available=False)
async_to_sync(extract_recap_pdf_base)(rd.pk, ocr_available=False)
add_items_to_solr([rd.pk], "search.RECAPDocument")


Expand Down
Loading