-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
users: Send emails to Brevo instead of Mailjet
- Loading branch information
Showing
5 changed files
with
841 additions
and
534 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
import enum | ||
import logging | ||
import time | ||
|
||
import httpx | ||
from allauth.account.models import EmailAddress | ||
from django.conf import settings | ||
from django.db.models import Exists, OuterRef, Q | ||
from sentry_sdk.crons import monitor | ||
|
||
from itou.companies.enums import SIAE_WITH_CONVENTION_KINDS | ||
from itou.companies.models import CompanyMembership | ||
from itou.prescribers.enums import PrescriberOrganizationKind | ||
from itou.prescribers.models import PrescriberMembership | ||
from itou.users.enums import IdentityProvider, UserKind | ||
from itou.users.models import User | ||
from itou.utils.command import BaseCommand | ||
from itou.utils.iterators import chunks | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# https://app.brevo.com/contact/list-listing | ||
BREVO_LIST_ID = 31 | ||
BREVO_API_URL = "https://api.brevo.com/v3" | ||
|
||
|
||
class UserCategory(enum.Enum): | ||
PRECSRIBER_FT = "prescripteur FT" | ||
PRESCRIBER = "prescripteur habilité" | ||
ORIENTEUR = "orienteur" | ||
EMPLOYEUR = "employeur" | ||
|
||
|
||
class BrevoClient: | ||
IMPORT_BATCH_SIZE = 1000 | ||
DELETE_BATCH_SIZE = 150 # see https://developers.brevo.com/reference/removecontactfromlist | ||
EXPORT_BATCH_SIZE = 500 # see https://developers.brevo.com/reference/getcontactsfromlist | ||
|
||
def __init__(self): | ||
self.client = httpx.Client( | ||
headers={ | ||
"accept": "application/json", | ||
"api-key": settings.BREVO_API_KEY, | ||
} | ||
) | ||
|
||
def _import_contacts(self, users_data, category): | ||
data = [ | ||
{ | ||
"email": user["email"], | ||
"attributes": { | ||
"prenom": user["first_name"].title(), | ||
"nom": user["last_name"].upper(), | ||
"date_inscription": user["date_joined"].strftime("%Y-%m-%d"), | ||
"type": category.value, | ||
}, | ||
} | ||
for user in users_data | ||
] | ||
|
||
response = self.client.post( | ||
f"{BREVO_API_URL}/contacts/import", | ||
headers={"Content-Type": "application/json"}, | ||
json={ | ||
"listIds": [BREVO_LIST_ID], | ||
"emailBlacklist": False, | ||
"smsBlacklist": False, | ||
"updateExistingContacts": True, | ||
"emptyContactsAttributes": False, | ||
"jsonBody": data, | ||
}, | ||
) | ||
if response.status_code != 202: | ||
logger.error("Brevo API: Some emails were not imported", exc_info=True) | ||
|
||
def import_users(self, users, category): | ||
for chunk in chunks(users, self.IMPORT_BATCH_SIZE): | ||
if chunk: | ||
self._import_contacts(chunk, category) | ||
time.sleep(1) | ||
|
||
def _get_contacts_from_list(self, limit, offset): | ||
response = self.client.get( | ||
f"{BREVO_API_URL}/contacts/lists/{BREVO_LIST_ID}/contacts?limit={limit}&offset={offset}" | ||
) | ||
if response.status_code != 200: | ||
return [], 0 | ||
data = response.json() | ||
return [user_data["email"] for user_data in data["contacts"]], data["count"] | ||
|
||
def all_emails(self): | ||
emails = [] | ||
limit = self.EXPORT_BATCH_SIZE | ||
count = 0 | ||
offset = 0 | ||
while True: | ||
batch, new_count = self._get_contacts_from_list(limit, offset) | ||
emails += batch | ||
count = count or new_count # set count | ||
offset += limit | ||
if offset >= count: | ||
break | ||
return emails | ||
|
||
def _remove_contacts_from_list(self, emails): | ||
response = self.client.post( | ||
f"{BREVO_API_URL}/contacts/lists/{BREVO_LIST_ID}/contacts/remove", | ||
headers={"Content-Type": "application/json"}, | ||
json={"emails": emails}, | ||
) | ||
if response.status_code != 201 or response.json()["contacts"]["failure"]: | ||
logger.error("Brevo API: Some emails were not deleted", exc_info=True) | ||
|
||
def delete_users(self, emails): | ||
for chunk in chunks(emails, self.DELETE_BATCH_SIZE): | ||
if chunk: | ||
self._remove_contacts_from_list(chunk) | ||
|
||
|
||
class Command(BaseCommand): | ||
def add_arguments(self, parser): | ||
super().add_arguments(parser) | ||
parser.add_argument( | ||
"--wet-run", | ||
action="store_true", | ||
help="Enroll new users to a mailing list in Brevo", | ||
) | ||
|
||
@monitor(monitor_slug="new-users-to-mailjet") | ||
def handle(self, *args, wet_run, **options): | ||
client = BrevoClient() | ||
|
||
users = ( | ||
User.objects.filter(kind__in=[UserKind.PRESCRIBER, UserKind.EMPLOYER]) | ||
.filter( | ||
# Someday only filter on identity_provider ? | ||
Exists( | ||
EmailAddress.objects.filter( | ||
user_id=OuterRef("pk"), | ||
email=OuterRef("email"), | ||
primary=True, | ||
verified=True, | ||
) | ||
) | ||
| Q(identity_provider=IdentityProvider.INCLUSION_CONNECT), # IC verifies emails on its own | ||
is_active=True, | ||
) | ||
.order_by("email") | ||
) | ||
employers = list( | ||
users.filter(kind=UserKind.EMPLOYER) | ||
.filter( | ||
Exists( | ||
CompanyMembership.objects.filter( | ||
user_id=OuterRef("pk"), | ||
is_active=True, | ||
company__kind__in=SIAE_WITH_CONVENTION_KINDS, | ||
) | ||
) | ||
) | ||
.values("email", "first_name", "last_name", "date_joined") | ||
) | ||
|
||
all_prescribers = users.filter(kind=UserKind.PRESCRIBER) | ||
prescriber_membership_qs = PrescriberMembership.objects.filter(user_id=OuterRef("pk"), is_active=True) | ||
prescriber_ft_membership_qs = prescriber_membership_qs.filter(organization__kind=PrescriberOrganizationKind.PE) | ||
prescribers_ft = list( | ||
users.filter(Exists(prescriber_ft_membership_qs)).values("email", "first_name", "last_name", "date_joined") | ||
) | ||
prescribers = list( | ||
all_prescribers.filter(Exists(prescriber_membership_qs.filter(organization__is_authorized=True))) | ||
.exclude(Exists(prescriber_ft_membership_qs)) | ||
.values("email", "first_name", "last_name", "date_joined") | ||
) | ||
orienteurs = list( | ||
all_prescribers.exclude(Exists(prescriber_membership_qs.filter(organization__is_authorized=True))).values( | ||
"email", "first_name", "last_name", "date_joined" | ||
) | ||
) | ||
|
||
logger.info("SIAE users count: %d", len(employers)) | ||
logger.info("FT prescribers count: %d", len(prescribers_ft)) | ||
logger.info("Prescribers count: %d", len(prescribers)) | ||
logger.info("Orienteurs count: %d", len(orienteurs)) | ||
|
||
all_emails = [user["email"] for user in employers + prescribers_ft + prescribers + orienteurs] | ||
|
||
brevo_emails = client.all_emails() | ||
emails_to_delete = sorted(set(brevo_emails) - set(all_emails)) | ||
logger.info("Brevo emails count: %d", len(brevo_emails)) | ||
logger.info("Brevo emails to delete count: %d", len(emails_to_delete)) | ||
|
||
if wet_run: | ||
client.delete_users(emails_to_delete) | ||
|
||
for category, users in [ | ||
(UserCategory.EMPLOYEUR, employers), | ||
(UserCategory.PRECSRIBER_FT, prescribers_ft), | ||
(UserCategory.PRESCRIBER, prescribers), | ||
(UserCategory.ORIENTEUR, orienteurs), | ||
]: | ||
client.import_users(users, category) |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.