Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: don't request new backup if we're already at the maximum #141

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions myhoard/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,6 @@ class RestoreOptions(TypedDict):
target_time_approximate_ok: bool


def sort_completed_backups(backups: List[Backup]) -> List[Backup]:
def key(backup):
assert backup["completed_at"] is not None
return backup["completed_at"]

return sorted((backup for backup in backups if backup["completed_at"]), key=key)


class Controller(threading.Thread):
"""Main logic controller for the service. This drives the individual handlers like
BackupStream, BinlogScanner and RestoreCoordinator as well as provides state info
Expand Down Expand Up @@ -268,6 +260,14 @@ def mark_backup_requested(
def mode(self) -> Mode:
return self.state["mode"]

@property
def completed_backups(self) -> List[Backup]:
def key(backup: Backup):
assert backup["completed_at"] is not None
return backup["completed_at"]

return sorted((backup for backup in self.state["backups"] if backup["completed_at"]), key=key)

def restore_backup(
self,
*,
Expand Down Expand Up @@ -958,15 +958,14 @@ def _extend_binlog_stream_list(self):
assert self.restore_coordinator is not None
if not self.restore_coordinator.can_add_binlog_streams():
return
backups = sort_completed_backups(self.state["backups"])
backups = self.completed_backups
# If most recent current backup is not in the list of backups being restored then we're probably
# restoring some old backup and don't want to automatically get latest changes
if not any(bs["stream_id"] == backups[-1]["stream_id"] for bs in self.restore_coordinator.binlog_streams):
return

old_backups = [{"site": backup["site"], "stream_id": backup["stream_id"]} for backup in backups]
self._refresh_backups_list()
backups = sort_completed_backups(self.state["backups"])
new_backups: List[BinlogStream] = [{"site": backup["site"], "stream_id": backup["stream_id"]} for backup in backups]
if old_backups == new_backups:
return
Expand Down Expand Up @@ -1152,6 +1151,12 @@ def _mark_periodic_backup_requested_if_interval_exceeded(self):
normalized_backup_time = self._current_normalized_backup_timestamp()
most_recent_scheduled = None
last_normalized_backup_time = None
last_completed_backup_time = self.completed_backups[-1]["completed_at"] if self.completed_backups else None

# If we already have a recent enough backup, skip for now.
if last_completed_backup_time and datetime.datetime.fromtimestamp(last_completed_backup_time).isoformat() > normalized_backup_time:
return

if self.backup_streams:
normalized_backup_times = [
stream.state["normalized_backup_time"]
Expand Down Expand Up @@ -1182,6 +1187,12 @@ def _mark_periodic_backup_requested_if_interval_exceeded(self):
)
and (not most_recent_scheduled or time.time() - most_recent_scheduled >= half_backup_interval_s)
):
# If we already have more than the max, skip this one
# We allow one over the max just to handle rotation.
if (length := len(self.completed_backups)) > (max_count := self.backup_settings["backup_count_max"]):
self.log.info("Skipping automated backup request as we already have our maximum count of backups (%d >= %d)", length, max_count)
return

self.log.info(
"New normalized time %r differs from previous %r, adding new backup request",
normalized_backup_time,
Expand Down Expand Up @@ -1220,14 +1231,13 @@ def _process_removed_binlogs(self, binlogs):
stream.remove_binlogs(binlogs)

def _purge_old_backups(self):
purgeable = [backup for backup in self.state["backups"] if backup["completed_at"]]
purgeable = self.completed_backups
if len(purgeable) <= self.backup_settings["backup_count_min"]:
return

# For simplicity only ever drop one backup here. This function
# is called repeatedly so if there are for any reason more backups
# to drop they will be dropped soon enough
purgeable = sort_completed_backups(purgeable)
backup = purgeable[0]
if not backup["closed_at"]:
return
Expand Down Expand Up @@ -1610,7 +1620,7 @@ def _switch_basebackup_if_possible(self):
# We're trying to restore a backup but that keeps on failing in the basebackup restoration phase.
# If we have an older backup available try restoring that and play back all binlogs so that the
# system should end up in the exact same state eventually.
backups = sort_completed_backups(self.state["backups"])
backups = self.completed_backups
current_stream_id = self.state["restore_options"]["stream_id"]
earlier_backup = None
for backup in backups:
Expand Down
42 changes: 42 additions & 0 deletions test/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,48 @@ def maybe_flush_binlog():
assert 8 <= len(seen_backups) <= 13


def test_scheduled_backup_is_skipped_when_at_max(master_controller):
mcontroller, master = master_controller
# Backup every 3 seconds
mcontroller.backup_settings["backup_interval_minutes"] = 0.05
# Never delete backups if we don't have at least 2 no matter how old they are
mcontroller.backup_settings["backup_count_min"] = 2
# Delete backups if there are more than this even if the backup to delete is newer than max age
mcontroller.backup_settings["backup_count_max"] = 3
# Max age way higher than we'll get
mcontroller.backup_settings["backup_age_days_max"] = 1

mcontroller.switch_to_active_mode()
mcontroller.start()

seen_backups = set()
highest_backup_count = 0
last_flush = [time.monotonic()]

def maybe_flush_binlog():
if time.monotonic() - last_flush[0] > 0.2:
with mysql_cursor(**master.connect_options) as cursor:
cursor.execute("FLUSH BINARY LOGS")
last_flush[0] = time.monotonic()

# Wait for 30 seconds and ensure backup count stays between 3 and 4 the whole time (once 3 has been reached)
start_time = time.monotonic()
while time.monotonic() - start_time < 30:
maybe_flush_binlog()
for backup in mcontroller.state["backups"]:
seen_backups.add(backup["stream_id"])
completed_backups = [backup for backup in mcontroller.state["backups"] if backup["completed_at"]]
if len(completed_backups) > highest_backup_count:
highest_backup_count = len(completed_backups)
time.sleep(0.1)

assert highest_backup_count == 4

# We waited for 30 seconds altogether and backup interval is 3 seconds. There should be more than our max
# backups taken, as we should have allowed one extra to rotate through.
assert 4 < len(seen_backups)


def test_manual_backup_creation(master_controller):
mcontroller = master_controller[0]
# Never delete backups if we don't have at least 2 no matter how old they are
Expand Down