Skip to content

Commit

Permalink
(BSR)[API] feat: public api: add @atomic to products module
Browse files Browse the repository at this point in the history
Add @atomic to routes and some nested functions called by them.
  • Loading branch information
jbaudet-pass committed Jan 9, 2025
1 parent 50551a6 commit e6af3e8
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 137 deletions.
5 changes: 4 additions & 1 deletion api/src/pcapi/core/bookings/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,10 @@ def _cancel_booking(
technical_message_id="booking.cancelled",
)
batch.track_booking_cancellation(booking)
external_bookings_api.send_booking_notification_to_external_service(booking, BookingAction.CANCEL)

on_commit(
partial(external_bookings_api.send_booking_notification_to_external_service, booking, BookingAction.CANCEL)
)

update_external_user(booking.user)
update_external_pro(booking.venue.bookingEmail)
Expand Down
5 changes: 4 additions & 1 deletion api/src/pcapi/core/external/batch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from functools import partial
import logging
import textwrap

Expand All @@ -9,6 +10,7 @@
import pcapi.core.fraud.models as fraud_models
import pcapi.core.offers.models as offers_models
import pcapi.notifications.push as push_notifications
from pcapi.repository import on_commit
from pcapi.tasks import batch_tasks


Expand Down Expand Up @@ -167,7 +169,8 @@ def track_booking_cancellation(booking: bookings_models.Booking) -> None:
"offer_price": booking.total_amount,
}
payload = batch_tasks.TrackBatchEventRequest(event_name=event_name, event_payload=event_payload, user_id=user.id)
batch_tasks.track_event_task.delay(payload)

on_commit(partial(batch_tasks.track_event_task.delay, payload))


def format_offer_attributes(offer: offers_models.Offer) -> dict:
Expand Down
3 changes: 2 additions & 1 deletion api/src/pcapi/core/external/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
logger = logging.getLogger(__name__)


def update_offer_compliance_score(offer: offers_models.Offer, is_primary: bool) -> None:
def update_offer_compliance_score(offer_id: int, is_primary: bool) -> None:
offer = offers_models.Offer.query.get(offer_id)
payload = _get_payload_for_compliance_api(offer)
if is_primary:
compliance_tasks.update_offer_compliance_score_primary_task.delay(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from pcapi.utils.date import format_time_in_second_to_human_readable


def send_email_for_each_ongoing_booking(offer: Offer) -> None:
def send_email_for_each_ongoing_booking(offer_id: int) -> None:
offer = Offer.query.get(offer_id)

ongoing_bookings = (
bookings_models.Booking.query.join(bookings_models.Booking.stock)
.join(Stock.offer)
Expand Down
80 changes: 25 additions & 55 deletions api/src/pcapi/core/offers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,10 @@ def update_offer(
withdrawal_updated = updates_set & withdrawal_fields
oa_updated = "offererAddress" in updates
if should_send_mail and (withdrawal_updated or oa_updated):
transactional_mails.send_email_for_each_ongoing_booking(offer)
on_commit(partial(transactional_mails.send_email_for_each_ongoing_booking, offer.id))

reason = search.IndexationReason.OFFER_UPDATE

search.async_index_offer_ids([offer.id], reason=reason, log_extra={"changes": updates_set})

return offer
Expand Down Expand Up @@ -591,13 +592,10 @@ def batch_update_offers(query: BaseQuery, update_fields: dict, send_email_notifi
else:
db.session.commit()

on_commit(
partial(
search.async_index_offer_ids,
offer_ids_batch,
reason=search.IndexationReason.OFFER_BATCH_UPDATE,
log_extra={"changes": set(update_fields.keys())},
),
search.async_index_offer_ids(
offer_ids_batch,
reason=search.IndexationReason.OFFER_BATCH_UPDATE,
log_extra={"changes": set(update_fields.keys())},
)

withdrawal_updated = {"withdrawalDetails", "withdrawalType", "withdrawalDelay"}.intersection(
Expand All @@ -608,7 +606,7 @@ def batch_update_offers(query: BaseQuery, update_fields: dict, send_email_notifi
on_commit(
partial(
transactional_mails.send_email_for_each_ongoing_booking,
offer,
offer.id,
),
)

Expand Down Expand Up @@ -825,10 +823,8 @@ def create_stock(
offer.lastValidationPrice = price
repository.add_to_session(created_stock, *created_activation_codes, offer)
db.session.flush()
search.async_index_offer_ids(
[offer.id],
reason=search.IndexationReason.STOCK_CREATION,
)

search.async_index_offer_ids([offer.id], reason=search.IndexationReason.STOCK_CREATION)

return created_stock

Expand Down Expand Up @@ -916,6 +912,7 @@ def edit_stock(
finance_api.update_finance_event_pricing_date(stock)

repository.add_to_session(stock)

search.async_index_offer_ids(
[stock.offerId],
reason=search.IndexationReason.STOCK_UPDATE,
Expand Down Expand Up @@ -976,10 +973,7 @@ def publish_offer(
else:
if offer.publicationDate:
offers_repository.delete_future_offer(offer.id)
search.async_index_offer_ids(
[offer.id],
reason=search.IndexationReason.OFFER_PUBLICATION,
)
search.async_index_offer_ids([offer.id], reason=search.IndexationReason.OFFER_PUBLICATION)
logger.info(
"Offer has been published",
extra={"offer_id": offer.id, "venue_id": offer.venueId, "offer_status": offer.status},
Expand Down Expand Up @@ -1037,6 +1031,7 @@ def _delete_stock(stock: models.Stock, author_id: int | None = None, user_connec
if cancelled_bookings:
for booking in cancelled_bookings:
transactional_mails.send_booking_cancellation_by_pro_to_beneficiary_email(booking)

transactional_mails.send_booking_cancellation_confirmation_by_pro_email(cancelled_bookings)
if not FeatureToggle.WIP_DISABLE_CANCEL_BOOKING_NOTIFICATION.is_active():
on_commit(
Expand All @@ -1046,13 +1041,7 @@ def _delete_stock(stock: models.Stock, author_id: int | None = None, user_connec
)
)

on_commit(
partial(
search.async_index_offer_ids,
[stock.offerId],
reason=search.IndexationReason.STOCK_DELETION,
)
)
search.async_index_offer_ids([stock.offerId], reason=search.IndexationReason.STOCK_DELETION)


def delete_stock(stock: models.Stock, author_id: int | None = None, user_connect_as: bool | None = None) -> None:
Expand Down Expand Up @@ -1106,13 +1095,7 @@ def create_mediation(
)
_delete_mediations_and_thumbs(previous_mediations)

on_commit(
partial(
search.async_index_offer_ids,
[offer.id],
reason=search.IndexationReason.MEDIATION_CREATION,
),
)
search.async_index_offer_ids([offer.id], reason=search.IndexationReason.MEDIATION_CREATION)

return mediation

Expand Down Expand Up @@ -1204,13 +1187,8 @@ def add_criteria_to_offers(
db.session.bulk_save_objects(offer_criteria)
db.session.flush()

on_commit(
partial(
search.async_index_offer_ids,
offer_ids,
reason=search.IndexationReason.CRITERIA_LINK,
log_extra={"criterion_ids": criterion_ids},
),
search.async_index_offer_ids(
offer_ids, reason=search.IndexationReason.CRITERIA_LINK, log_extra={"criterion_ids": criterion_ids}
)

return True
Expand Down Expand Up @@ -1310,13 +1288,10 @@ def reject_inappropriate_products(
users_models.Favorite.query.filter(users_models.Favorite.offerId.in_(offer_ids)).delete(
synchronize_session=False
)
on_commit(
partial(
search.async_index_offer_ids,
offer_ids,
reason=search.IndexationReason.PRODUCT_REJECTION,
log_extra={"eans": eans},
),
search.async_index_offer_ids(
offer_ids,
reason=search.IndexationReason.PRODUCT_REJECTION,
log_extra={"eans": eans},
)

return True
Expand Down Expand Up @@ -1384,10 +1359,10 @@ def set_offer_status_based_on_fraud_criteria(offer: AnyOffer) -> models.OfferVal
status = models.OfferValidationStatus.PENDING
offer.flaggingValidationRules = flagging_rules
if isinstance(offer, models.Offer):
compliance.update_offer_compliance_score(offer, is_primary=True)
on_commit(partial(compliance.update_offer_compliance_score, offer.id, is_primary=True))
else:
if isinstance(offer, models.Offer):
compliance.update_offer_compliance_score(offer, is_primary=False)
on_commit(partial(compliance.update_offer_compliance_score, offer.id, is_primary=False))

logger.info("Computed offer validation", extra={"offer": offer.id, "status": status.value})
return status
Expand Down Expand Up @@ -2022,17 +1997,12 @@ def move_event_offer(

db.session.add(booking)

on_commit(
partial(
search.async_index_offer_ids,
{offer_id},
reason=search.IndexationReason.OFFER_UPDATE,
log_extra={"changes": {"venueId"}},
)
search.async_index_offer_ids(
{offer_id}, reason=search.IndexationReason.OFFER_UPDATE, log_extra={"changes": {"venueId"}}
)

if notify_beneficiary:
on_commit(partial(transactional_mails.send_email_for_each_ongoing_booking, offer))
on_commit(partial(transactional_mails.send_email_for_each_ongoing_booking, offer.id))


def move_collective_offer_venue(
Expand Down
26 changes: 18 additions & 8 deletions api/src/pcapi/core/search/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import abc
import datetime
import enum
from functools import partial
import logging
import typing

Expand All @@ -19,6 +20,7 @@
from pcapi.core.search.backends import base
from pcapi.models import db
from pcapi.models.feature import FeatureToggle
from pcapi.repository import on_commit
from pcapi.utils import requests
from pcapi.utils.module_loading import import_string

Expand Down Expand Up @@ -132,14 +134,22 @@ def async_index_offer_ids(
This function returns quickly. The "real" reindexation will be
done later through a cron job.
"""
_log_async_request("offers", offer_ids, reason, log_extra)
backend = _get_backend()
try:
backend.enqueue_offer_ids(offer_ids)
except Exception: # pylint: disable=broad-except
if not settings.CATCH_INDEXATION_EXCEPTIONS:
raise
logger.exception("Could not enqueue offer ids to index", extra={"offers": offer_ids})

def enqueue(
offer_ids: abc.Collection[int],
reason: IndexationReason,
log_extra: dict | None = None,
) -> None:
_log_async_request("offers", offer_ids, reason, log_extra)
backend = _get_backend()
try:
backend.enqueue_offer_ids(offer_ids)
except Exception: # pylint: disable=broad-except
if not settings.CATCH_INDEXATION_EXCEPTIONS:
raise
logger.exception("Could not enqueue offer ids to index", extra={"offers": offer_ids})

on_commit(partial(enqueue, offer_ids, reason, log_extra))


def async_index_collective_offer_template_ids(
Expand Down
Loading

0 comments on commit e6af3e8

Please sign in to comment.