Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add flag to stop table rotation
Browse files Browse the repository at this point in the history
Closes #1172
  • Loading branch information
jrconlin committed Apr 16, 2018
1 parent acb26ea commit 6b4d760
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 27 deletions.
2 changes: 2 additions & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class AutopushConfig(object):
# DynamoDB endpoint override
aws_ddb_endpoint = attrib(default=None) # type: str

allow_table_rotation = attrib(default=True) # type: bool

def __attrs_post_init__(self):
"""Initialize the Settings object"""
# Setup hosts/ports/urls
Expand Down
89 changes: 71 additions & 18 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,33 @@ def make_rotating_tablename(prefix, delta=0, date=None):


def create_rotating_message_table(
prefix="message", # type: str
delta=0, # type: int
date=None, # type: Optional[datetime.date]
read_throughput=5, # type: int
write_throughput=5, # type: int
boto_resource=None # type: DynamoDBResource
prefix="message", # type: str
delta=0, # type: int
date=None, # type: Optional[datetime.date]
read_throughput=5, # type: int
write_throughput=5, # type: int
boto_resource=None, # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""
tablename = make_rotating_tablename(prefix, delta, date)

return create_message_table(
tablename,
read_throughput=read_throughput,
write_throughput=write_throughput,
boto_resource=boto_resource
)


def create_message_table(
tablename, # type: str
read_throughput=5, # type: int
write_throughput=5, # type: int
boto_resource=None, # type: DynamoDBResource
):
# type: (...) -> Any # noqa
"""Create a new message table for webpush style message storage"""

try:
table = boto_resource.Table(tablename)
if table.table_status == 'ACTIVE':
Expand Down Expand Up @@ -466,6 +482,17 @@ def __init__(self, **kwargs):
def __getattr__(self, name):
return getattr(self._resource, name)

def get_latest_message_tablename(self, prefix="message"):
# type: (Optional[str]) -> str # noqa
"""Fetches the name of the last message table"""
response = filter(
lambda name: name.lower().startswith(prefix),
self._resource.meta.client.list_tables().get("TableNames"))
if not len(response):
return prefix
response.sort()
return response[-1]


class DynamoDBTable(threading.local):
def __init__(self, ddb_resource, *args, **kwargs):
Expand All @@ -478,8 +505,9 @@ def __getattr__(self, name):

class Message(object):
"""Create a Message table abstraction on top of a DynamoDB Table object"""
def __init__(self, tablename, metrics=None, boto_resource=None,
max_ttl=MAX_EXPIRY):
def __init__(self, tablename=None, metrics=None, boto_resource=None,
max_ttl=MAX_EXPIRY,
table_base_string="message_"):
# type: (str, IMetrics, DynamoDBResource, int) -> None
"""Create a new Message object
Expand All @@ -488,10 +516,13 @@ def __init__(self, tablename, metrics=None, boto_resource=None,
:param boto_resource: DynamoDBResource for thread
"""
self.tablename = tablename
self._max_ttl = max_ttl
self.resource = boto_resource
if tablename is None:
tablename = boto_resource.get_latest_message_tablename(
table_base_string)
self.table = DynamoDBTable(self.resource, tablename)
self.tablename = tablename

def table_status(self):
return self.table.table_status
Expand Down Expand Up @@ -995,19 +1026,22 @@ class DatabaseManager(object):

router = attrib(default=None) # type: Optional[Router]
message_tables = attrib(default=Factory(list)) # type: List[str]
current_msg_month = attrib(init=False) # type: Optional[str]
current_msg_month = attrib(default=None, init=False) # type: Optional[str]
current_month = attrib(init=False) # type: Optional[int]
_message = attrib(default=None) # type: Optional[Message]
allow_table_rotation = attrib(default=True) # type: Optional[bool]
# for testing:

def __attrs_post_init__(self):
"""Initialize sane defaults"""
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)
if self.allow_table_rotation:
today = datetime.date.today()
self.current_month = today.month
self.current_msg_month = make_rotating_tablename(
self._message_conf.tablename,
date=today
)

if not self.resource:
self.resource = DynamoDBResource()

Expand All @@ -1027,6 +1061,7 @@ def from_config(cls,
message_conf=conf.message_table,
metrics=metrics,
resource=resource,
allow_table_rotation=conf.allow_table_rotation,
**kwargs
)

Expand Down Expand Up @@ -1054,7 +1089,8 @@ def setup_tables(self):
self.create_initial_message_tables()
self._message = Message(self.current_msg_month,
self.metrics,
boto_resource=self.resource)
boto_resource=self.resource,
table_base_string=self._message_conf.tablename)

@property
def message(self):
Expand All @@ -1078,6 +1114,21 @@ def create_initial_message_tables(self):
an entry for tomorrow, if tomorrow is a new month.
"""
if not self.allow_table_rotation:
tablename = self.resource.get_latest_message_tablename(
prefix=self._message_conf.tablename
)
if not table_exists(tablename,
boto_resource=self.resource):
create_message_table(
tablename=tablename,
read_throughput=self._message_conf.read_throughput,
write_throughput=self._message_conf.write_throughput,
boto_resource=self.resource
)
self.message_tables.append(tablename)
return

mconf = self._message_conf
today = datetime.date.today()
last_month = get_rotating_message_tablename(
Expand Down Expand Up @@ -1116,6 +1167,8 @@ def update_rotating_tables(self):
table objects on the settings object.
"""
if not self.allow_table_rotation:
returnValue(False)
mconf = self._message_conf
today = datetime.date.today()
tomorrow = self._tomorrow()
Expand Down
5 changes: 4 additions & 1 deletion autopush/diagnostic_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def run(self):
print("\n")

if "current_month" in rec:
chans = Message(rec["current_month"],
month = None
if self._conf.allow_table_rotation:
month = rec["current_month"]
chans = Message(month,
boto_resource=self.db.resource).all_channels(uaid)
print("Channels in message table:")
self._pp.pprint(chans)
Expand Down
26 changes: 25 additions & 1 deletion autopush/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ def test_init_with_resources(self):
assert dm.resource is not None
assert isinstance(dm.resource, DynamoDBResource)

def test_init_with_no_rotate(self):
fake_conf = Mock()
fake_conf.allow_table_rotation = False
fake_conf.message_table = Mock()
fake_conf.message_table.tablename = "message_test"
fake_conf.message_table.read_throughput = 5
fake_conf.message_table.write_throughput = 5
dm = DatabaseManager.from_config(
fake_conf,
resource=autopush.tests.boto_resource)
dm.create_initial_message_tables()
assert dm.current_msg_month is None


class DdbResourceTest(unittest.TestCase):
@patch("boto3.resource")
Expand Down Expand Up @@ -205,10 +218,21 @@ def test_normalize_id(self):
class MessageTestCase(unittest.TestCase):
def setUp(self):
self.resource = autopush.tests.boto_resource
table = get_rotating_message_tablename(boto_resource=self.resource)
table = get_rotating_message_tablename(
prefix="message_int_test",
boto_resource=self.resource)
self.real_table = table
self.uaid = str(uuid.uuid4())

def test_non_rotating_tables(self):
message_tablename = "message_int_test"
message = Message(None, SinkMetrics(),
boto_resource=self.resource,
table_base_string=message_tablename)
table_name = self.resource.get_latest_message_tablename(
prefix=message_tablename)
assert message.tablename == table_name

def test_register(self):
chid = str(uuid.uuid4())

Expand Down
14 changes: 14 additions & 0 deletions autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,20 @@ def test_migrate_user(self):
assert item is not None
assert len(channels) == 3

def test_no_migrate(self):
self.conf.allow_table_rotation = False
self.conf.message_table.tablename = "message_int_test"
self.db = db = DatabaseManager.from_config(
self.conf,
resource=autopush.tests.boto_resource
)
assert self.db.allow_table_rotation is False
db.setup_tables()
tablename = autopush.tests.boto_resource.get_latest_message_tablename(
prefix="message_int_test"
)
assert db.message.tablename == tablename


class TestRegisterProcessor(BaseSetup):

Expand Down
36 changes: 36 additions & 0 deletions autopush/tests/test_z_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,42 @@ def check_tables(result):
d.addBoth(lambda x: e.callback(True))
return e

def test_no_rotation(self):
today = datetime.date.today()
next_month = today.month + 1
next_year = today.year
if next_month > 12: # pragma: nocover
next_month = 1
next_year += 1
tomorrow = datetime.datetime(year=next_year,
month=next_month,
day=1)
conf = AutopushConfig(
hostname="example.com",
resolve_hostname=True,
allow_table_rotation=False
)
resource = autopush.tests.boto_resource
db = DatabaseManager.from_config(
conf,
resource=resource)
db._tomorrow = Mock(return_value=tomorrow)
db.create_initial_message_tables()
assert len(db.message_tables) == 1
assert db.message_tables[0] == resource.get_latest_message_tablename(
prefix=conf.message_table.tablename
)

def check_tables(result):
assert len(db.message_tables) == 1
assert db.message_tables[0] == \
resource.get_latest_message_tablename(
prefix=conf.message_table.tablename
)
dd = db.update_rotating_tables()
dd.addCallback(check_tables)
return dd


class ConnectionMainTestCase(unittest.TestCase):
def setUp(self):
Expand Down
21 changes: 14 additions & 7 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ def metrics(self):
def process(self, command):
raise NotImplementedError()

def get_month(self, command):
if not self.conf.allow_table_rotation:
return None
return command.message_month


class HelloCommand(ProcessorCommand):
def process(self, hello):
Expand Down Expand Up @@ -481,7 +486,7 @@ def process(self, command):
def _check_storage(self, command):
timestamp = None
messages = []
message = Message(command.message_month,
message = Message(self.get_month(command),
boto_resource=self.db.resource)
if command.include_topic:
timestamp, messages = message.fetch_messages(
Expand Down Expand Up @@ -516,7 +521,7 @@ def _check_storage(self, command):
class IncrementStorageCommand(ProcessorCommand):
def process(self, command):
# type: (IncStoragePosition) -> IncStoragePositionResponse
message = Message(command.message_month,
message = Message(self.get_month(command),
boto_resource=self.db.resource)
message.update_last_message_read(command.uaid, command.timestamp)
return IncStoragePositionResponse()
Expand All @@ -526,7 +531,7 @@ class DeleteMessageCommand(ProcessorCommand):
def process(self, command):
# type: (DeleteMessage) -> DeleteMessageResponse
notif = command.message.to_WebPushNotification()
message = Message(command.message_month,
message = Message(self.get_month(command),
boto_resource=self.db.resource)
message.delete_message(notif)
return DeleteMessageResponse()
Expand All @@ -543,7 +548,9 @@ class MigrateUserCommand(ProcessorCommand):
def process(self, command):
# type: (MigrateUser) -> MigrateUserResponse
# Get the current channels for this month
message = Message(command.message_month,
if not self.conf.allow_table_rotation:
return MigrateUserResponse(message_month=None)
message = Message(self.get_month(command),
boto_resource=self.db.resource)
_, channels = message.all_channels(command.uaid.hex)

Expand All @@ -565,7 +572,7 @@ def process(self, command):
class StoreMessagesUserCommand(ProcessorCommand):
def process(self, command):
# type: (StoreMessages) -> StoreMessagesResponse
message = Message(command.message_month,
message = Message(self.get_month(command),
boto_resource=self.db.resource)
for m in command.messages:
if "topic" not in m:
Expand Down Expand Up @@ -620,7 +627,7 @@ def process(self, command):
command.channel_id,
command.key
)
message = self.db.message_table(command.message_month)
message = self.db.message_table(self.get_month(command))
try:
message.register_channel(command.uaid.hex,
command.channel_id)
Expand Down Expand Up @@ -669,7 +676,7 @@ def process(self,
if not valid:
return UnregisterErrorResponse(error_msg=msg)

message = Message(command.message_month,
message = Message(self.get_month(command),
boto_resource=self.db.resource)
try:
message.unregister_channel(command.uaid.hex, command.channel_id)
Expand Down
3 changes: 3 additions & 0 deletions configs/autopush_shared.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,6 @@ endpoint_port = 8082
; e.g {"firefox":{"cert":"certs/main.cert","key":"certs/main.key","topic":"com.mozilla.org.Firefox","max_retry":2},"beta":{"cert":"certs/beta.cert","key":"certs/beta.key","topic":"com.mozilla.org.FirefoxBeta"}}
#apns_creds =

; With TTL implemented, message table rotation is no longer required.
; This flag determines if table rotation should be allowed to continue:
#allow_table_rotation = true

0 comments on commit 6b4d760

Please sign in to comment.