Skip to content
This repository has been archived by the owner on Feb 8, 2018. It is now read-only.

Persist email messages #4572

Merged
merged 3 commits into from
Aug 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 40 additions & 39 deletions gratipay/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from jinja2 import Environment
from markupsafe import escape as htmlescape

from gratipay.exceptions import Throttled
from gratipay.exceptions import NoEmailAddress, Throttled
from gratipay.models.participant import Participant
from gratipay.utils import find_files, i18n

Expand All @@ -38,6 +38,7 @@ def __init__(self, env, db, tell_sentry, root):
self.tell_sentry = tell_sentry
self.sleep_for = env.email_queue_sleep_for
self.allow_up_to = env.email_queue_allow_up_to
self.log_every = env.email_queue_log_metrics_every

templates = {}
templates_dir = os.path.join(root, 'emails')
Expand Down Expand Up @@ -75,13 +76,18 @@ def put(self, to, template, _user_initiated=True, **context):
"""
with self.db.get_cursor() as cursor:
cursor.run("""
INSERT INTO email_queue
INSERT INTO email_messages
(participant, spt_name, context, user_initiated)
VALUES (%s, %s, %s, %s)
""", (to.id, template, pickle.dumps(context), _user_initiated))
if _user_initiated:
n = cursor.one('SELECT count(*) FROM email_queue '
'WHERE participant=%s AND user_initiated', (to.id,))
n = cursor.one("""
SELECT count(*)
FROM email_messages
WHERE participant=%s
AND result is null
AND user_initiated
""", (to.id,))
if n > self.allow_up_to:
raise Throttled()

Expand All @@ -91,9 +97,9 @@ def flush(self):
"""
fetch_messages = lambda: self.db.all("""
SELECT *
FROM email_queue
WHERE not dead
ORDER BY id ASC
FROM email_messages
WHERE result is null
ORDER BY ctime ASC
LIMIT 60
""")
nsent = 0
Expand All @@ -103,38 +109,30 @@ def flush(self):
break
for rec in messages:
try:
r = self._flush_one(rec)
except:
self.db.run("UPDATE email_queue SET dead=true WHERE id = %s", (rec.id,))
raise
self.db.run("DELETE FROM email_queue WHERE id = %s", (rec.id,))
if r == 1:
sleep(self.sleep_for)
nsent += r
message = self._prepare_email_message_for_ses(rec)
result = self._mailer.send_email(**message)
remote_message_id = result['MessageId'] # let KeyErrors go to Sentry
except Exception as exc:
self._store_result(rec.id, repr(exc), None)
raise # we want to see this in Sentry
self._store_result(rec.id, '', remote_message_id)
nsent += 1
sleep(self.sleep_for)
return nsent


def _flush_one(self, rec):
"""Send an email message using the underlying ``_mailer``.

:param Record rec: a database record from the ``email_queue`` table
:return int: the number of emails sent (0 or 1)

"""
message = self._prepare_email_message_for_ses(rec)
if message is None:
return 0 # Not sent
self._mailer.send_email(**message)
return 1 # Sent
def _store_result(self, message_id, result, remote_message_id):
self.db.run("UPDATE email_messages SET result=%s, remote_message_id=%s "
"WHERE id=%s", (result, remote_message_id, message_id))


def _prepare_email_message_for_ses(self, rec):
"""Prepare an email message for delivery via Amazon SES.

:param Record rec: a database record from the ``email_queue`` table
:param Record rec: a database record from the ``email_messages`` table

:returns: ``None`` if we can't find an email address to send to
:returns: ``dict`` if we can find an email address to send to
:raises: ``NoEmailAddress`` if we can't find an email address to send to

We look for an email address to send to in two places:

Expand All @@ -156,7 +154,7 @@ def _prepare_email_message_for_ses(self, rec):
context.setdefault('include_unsubscribe', True)
email = context.setdefault('email', to.email_address)
if not email:
return None
raise NoEmailAddress()
langs = i18n.parse_accept_lang(to.email_lang or 'en')
locale = i18n.match_lang(langs)
i18n.add_helpers_to_context(self.tell_sentry, context, locale)
Expand Down Expand Up @@ -188,15 +186,16 @@ def render(t, context):


def log_metrics(self, _print=print):
ndead = self.db.one('SELECT COUNT(*) FROM email_queue WHERE dead')
ntotal = self.db.one('SELECT COUNT(*) FROM email_queue')
_print('count#email_queue_dead=%d count#email_queue_total=%d' % (ndead, ntotal))


def purge(self):
"""Remove all messages from the queue.
"""
self.db.run('DELETE FROM email_queue')
stats = self.db.one("""
SELECT count(CASE WHEN result = '' THEN 1 END) AS sent
, count(CASE WHEN result > '' THEN 1 END) AS failed
, count(CASE WHEN result is null THEN 1 END) AS pending
FROM email_messages
WHERE ctime > now() - %s::interval
""", ('{} seconds'.format(self.log_every),), back_as=dict)
prefix = 'count#email_queue'
variables = ('sent', 'failed', 'pending')
_print(' '.join('{}_{}={}'.format(prefix, v, stats[v]) for v in variables))


jinja_env = Environment()
Expand Down Expand Up @@ -242,3 +241,5 @@ def send_email(self, **email):
p(' ', line)
p()
p('-'*78)

return {'MessageId': 'deadbeef'} # simulate a remote message id
3 changes: 3 additions & 0 deletions gratipay/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def lazy_body(self, _):
return _("You've reached the maximum number of email addresses we allow.")


class NoEmailAddress(Exception):
pass

class Throttled(LocalizedErrorResponse):
def lazy_body(self, _):
return _("You've initiated too many emails too quickly. Please try again in a minute or two.")
Expand Down
2 changes: 1 addition & 1 deletion gratipay/models/package/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def classify_emails_for_participant(self, participant):
other_verified = self.db.all('''

SELECT address
FROM emails
FROM email_addresses
WHERE verified is true
AND participant_id != %s
AND address = ANY((SELECT emails FROM packages WHERE id=%s)::text[])
Expand Down
12 changes: 6 additions & 6 deletions gratipay/models/participant/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def clear_personal_information(self, cursor):
AND is_member IS true
);

DELETE FROM emails WHERE participant_id = %(participant_id)s;
DELETE FROM email_addresses WHERE participant_id = %(participant_id)s;
DELETE FROM statements WHERE participant=%(participant_id)s;
DELETE FROM participant_identities WHERE participant_id=%(participant_id)s;

Expand Down Expand Up @@ -1104,17 +1104,17 @@ def take_over(self, account, have_confirmation=False):

MERGE_EMAIL_ADDRESSES = """

WITH emails_to_keep AS (
WITH email_addresses_to_keep AS (
SELECT DISTINCT ON (address) id
FROM emails
FROM email_addresses
WHERE participant_id IN (%(dead)s, %(live)s)
ORDER BY address, verification_end, verification_start DESC
)
DELETE FROM emails
DELETE FROM email_addresses
WHERE participant_id IN (%(dead)s, %(live)s)
AND id NOT IN (SELECT id FROM emails_to_keep);
AND id NOT IN (SELECT id FROM email_addresses_to_keep);

UPDATE emails
UPDATE email_addresses
SET participant_id = %(live)s
WHERE participant_id = %(dead)s;

Expand Down
25 changes: 13 additions & 12 deletions gratipay/models/participant/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def __repr__(self):
class Email(object):
"""Participants may associate email addresses with their account.

Email addresses are stored in an ``emails`` table in the database, which
holds the addresses themselves as well as info related to address
Email addresses are stored in an ``email_addresses`` table in the database,
which holds the addresses themselves as well as info related to address
verification. While a participant may have multiple email addresses on
file, verified or not, only one will be the *primary* email address: the
one also recorded in ``participants.email_address``. It's a bug for the
primary address not to be verified, or for an address to be in
``participants.email_address`` but not also in ``emails``.
``participants.email_address`` but not also in ``email_addresses``.

Having a verified email is a prerequisite for certain other features on
Gratipay, such as linking a PayPal account, or filing a national identity.
Expand Down Expand Up @@ -108,7 +108,7 @@ def validate_email_verification_request(self, c, email, *packages):

owner_id = c.one("""
SELECT participant_id
FROM emails
FROM email_addresses
WHERE address = %(email)s
AND verified IS true
""", dict(email=email))
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_email_verification_nonce(self, c, email):
"""Given a cursor and email address, return a verification nonce.
"""
nonce = str(uuid.uuid4())
existing = c.one( 'SELECT * FROM emails WHERE address=%s AND participant_id=%s'
existing = c.one( 'SELECT * FROM email_addresses WHERE address=%s AND participant_id=%s'
, (email, self.id)
) # can't use eafp here because of cursor error handling
# XXX I forget what eafp is. :(
Expand All @@ -170,7 +170,8 @@ def get_email_verification_nonce(self, c, email):
# Not in the table yet. This should throw an IntegrityError if the
# address is verified for a different participant.

c.run( "INSERT INTO emails (participant_id, address, nonce) VALUES (%s, %s, %s)"
c.run( "INSERT INTO email_addresses (participant_id, address, nonce) "
"VALUES (%s, %s, %s)"
, (self.id, email, nonce)
)
else:
Expand All @@ -181,7 +182,7 @@ def get_email_verification_nonce(self, c, email):
if existing.nonce:
c.run('DELETE FROM claims WHERE nonce=%s', (existing.nonce,))
c.run("""
UPDATE emails
UPDATE email_addresses
SET nonce=%s
, verification_start=now()
WHERE participant_id=%s
Expand Down Expand Up @@ -299,15 +300,15 @@ def save_email_address(self, cursor, address):

"""
cursor.run("""
UPDATE emails
UPDATE email_addresses
SET verified=true, verification_end=now(), nonce=NULL
WHERE participant_id=%s
AND address=%s
AND verified IS NULL
""", (self.id, address))
cursor.run("""
DELETE
FROM emails
FROM email_addresses
WHERE participant_id != %s
AND address=%s
""", (self.id, address))
Expand All @@ -324,7 +325,7 @@ def get_email(self, address, cursor=None, and_lock=False):
:returns: a database record (a named tuple)

"""
sql = 'SELECT * FROM emails WHERE participant_id=%s AND address=%s'
sql = 'SELECT * FROM email_addresses WHERE participant_id=%s AND address=%s'
if and_lock:
sql += ' FOR UPDATE'
return (cursor or self.db).one(sql, (self.id, address))
Expand All @@ -335,7 +336,7 @@ def get_emails(self, cursor=None):
"""
return (cursor or self.db).all("""
SELECT *
FROM emails
FROM email_addresses
WHERE participant_id=%s
ORDER BY id
""", (self.id,))
Expand All @@ -359,7 +360,7 @@ def remove_email(self, address):
, 'participant'
, dict(id=self.id, action='remove', values=dict(email=address))
)
c.run("DELETE FROM emails WHERE participant_id=%s AND address=%s",
c.run("DELETE FROM email_addresses WHERE participant_id=%s AND address=%s",
(self.id, address))


Expand Down
10 changes: 5 additions & 5 deletions gratipay/models/participant/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def get_packages_for_claiming(self, manager):
"""
return self.db.all('''

WITH verified_emails AS (
WITH verified_email_addresses AS (
SELECT e.address
, e.address = p.email_address is_primary
FROM emails e
FROM email_addresses e
LEFT JOIN participants p
ON p.id = e.participant_id
WHERE e.participant_id=%s
Expand All @@ -36,12 +36,12 @@ def get_packages_for_claiming(self, manager):
SELECT pkg.*::packages package
, p.*::participants claimed_by
, (SELECT is_primary
FROM verified_emails
FROM verified_email_addresses
WHERE address = ANY(emails)
ORDER BY is_primary DESC, address
LIMIT 1) email_address_is_primary
, (SELECT address
FROM verified_emails
FROM verified_email_addresses
WHERE address = ANY(emails)
ORDER BY is_primary DESC, address
LIMIT 1) email_address
Expand All @@ -53,7 +53,7 @@ def get_packages_for_claiming(self, manager):
LEFT JOIN participants p
ON t.owner = p.username
WHERE package_manager=%s
AND pkg.emails && array(SELECT address FROM verified_emails)
AND pkg.emails && array(SELECT address FROM verified_email_addresses)
ORDER BY email_address_is_primary DESC
, email_address ASC
, pkg.name ASC
Expand Down
34 changes: 19 additions & 15 deletions gratipay/testing/email.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals

import mock

from gratipay.testing import Harness


Expand Down Expand Up @@ -37,33 +35,39 @@ def get_last_email(self):


class QueuedEmailHarness(_AbstractEmailHarness):
"""An email harness that pulls from the ``email_queue`` table.
"""An email harness that pulls from the ``email_messages`` table.
"""

def _get_last_email(self):
rec = self.db.one('SELECT * FROM email_queue ORDER BY id DESC LIMIT 1')
rec = self.db.one('SELECT * FROM email_messages ORDER BY ctime DESC LIMIT 1')
return self.app.email_queue._prepare_email_message_for_ses(rec)

def count_email_messages(self):
return self.db.one('SELECT count(*) FROM email_queue')
return self.db.one('SELECT count(*) FROM email_messages WHERE result is null')


class SentEmailHarness(_AbstractEmailHarness):
"""An email harness that mocks ``_mailer.send_email`` to ``get_last_email``
post-queue.
"""An email harness that patches ``_mailer.send_email`` to ``get_last_email``
after running through the email queue machinery.
"""

def setUp(self):
_AbstractEmailHarness.setUp(self)
self.mailer_patcher = mock.patch.object(self.app.email_queue._mailer, 'send_email')
self.mailer = self.mailer_patcher.start()
self.addCleanup(self.mailer_patcher.stop)
sleep_patcher = mock.patch('gratipay.application.email.sleep')
sleep_patcher.start()
self.addCleanup(sleep_patcher.stop)
self.__messages = []

def send_email(**message):
self.__messages.append(message)
return {'MessageId': 'deadbeef'}

self.__send_email = self.app.email_queue._mailer.send_email
self.app.email_queue._mailer.send_email = send_email

def tearDown(self):
self.app.email_queue._mailer.send_email = self.__send_email
_AbstractEmailHarness.tearDown(self)

def _get_last_email(self):
return self.mailer.call_args[1]
return self.__messages[-1]

def count_email_messages(self):
return self.mailer.call_count
return len(self.__messages)
Loading